http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java new file mode 100644 index 0000000..3aa2931 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.ws.rs.core.Response.Status; + +/** + * Encapsulates an HTTP response. The toString method returns the + * specification-compliant response. + * + * @author unattributed + */ +public class HttpResponse { + + private final Status status; + private final String entity; + private final Map<String, String> headers = new HashMap<>(); + + public HttpResponse(final Status status, final String entity) { + this.status = status; + this.entity = entity; + headers.put("content-length", String.valueOf(entity.getBytes().length)); + } + + public String getEntity() { + return entity; + } + + public Status getStatus() { + return status; + } + + public Map<String, String> getHeaders() { + return Collections.unmodifiableMap(headers); + } + + public void addHeader(final String key, final String value) { + if (key.contains(" ")) { + throw new IllegalArgumentException("Header key may not contain spaces."); + } else if ("content-length".equalsIgnoreCase(key)) { + throw new IllegalArgumentException("Content-Length header is set automatically based on length of content."); + } + headers.put(key, value); + } + + public void addHeaders(final Map<String, String> headers) { + for (final Map.Entry<String, String> entry : headers.entrySet()) { + addHeader(entry.getKey(), entry.getValue()); + } + } + + @Override + public String toString() { + + final StringBuilder strb = new StringBuilder(); + + // response line + strb.append("HTTP/1.1 ") + .append(status.getStatusCode()) + .append(" ") + .append(status.getReasonPhrase()) + .append("\n"); + + // headers + for (final Map.Entry<String, String> entry : headers.entrySet()) { + strb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + + strb.append("\n"); + + // body + strb.append(entity); + + return strb.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java new file mode 100644 index 0000000..28615d0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +/** + * Wraps a HttpResponse with a time-delay. When the action is applied, the + * currently executing thread sleeps for the given delay before returning the + * response to the caller. + * + * This class is good for simulating network latency. + * + * @author unattributed + */ +public class HttpResponseAction { + + private final HttpResponse response; + + private final int waitTimeMs; + + public HttpResponseAction(final HttpResponse response) { + this(response, 0); + } + + public HttpResponseAction(final HttpResponse response, final int waitTimeMs) { + this.response = response; + this.waitTimeMs = waitTimeMs; + } + + public HttpResponse apply() { + try { + Thread.sleep(waitTimeMs); + } catch (final InterruptedException ie) { + throw new RuntimeException(ie); + } + + return response; + } + + public HttpResponse getResponse() { + return response; + } + + public int getWaitTimeMs() { + return waitTimeMs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java new file mode 100644 index 0000000..f17a66c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.Reader; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple HTTP web server that allows clients to register canned-responses to + * respond to received requests. + * + * @author unattributed + */ +public class HttpServer { + + private static final Logger logger = LoggerFactory.getLogger(HttpServer.class); + + private final ExecutorService executorService; + private final ServerSocket serverSocket; + private final Queue<HttpResponseAction> responseQueue = new ConcurrentLinkedQueue<>(); + private final Map<String, String> checkedHeaders = new HashMap<>(); + private final Map<String, List<String>> checkedParameters = new HashMap<>(); + private final int port; + + public HttpServer(int numThreads, int port) throws IOException { + this.port = port; + executorService = Executors.newFixedThreadPool(numThreads); + serverSocket = new ServerSocket(port); + } + + public void start() { + + new Thread() { + @Override + public void run() { + while (isRunning()) { + try { + final Socket conn = serverSocket.accept(); + executorService.execute(new Runnable() { + @Override + public void run() { + handleRequest(conn); + if (conn.isClosed() == false) { + try { + conn.close(); + } catch (IOException ioe) { + } + } + } + }); + } catch (final SocketException se) { + /* ignored */ + } catch (final IOException ioe) { + if (logger.isDebugEnabled()) { + logger.warn("", ioe); + } + } + } + } + ; + } + + .start(); + } + + public boolean isRunning() { + return executorService.isShutdown() == false; + } + + public void stop() { + // shutdown server socket + try { + if (serverSocket.isClosed() == false) { + serverSocket.close(); + } + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + + // shutdown executor service + try { + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + } + + public int getPort() { + if (isRunning()) { + return serverSocket.getLocalPort(); + } else { + return port; + } + } + + public Queue<HttpResponseAction> addResponseAction(final HttpResponseAction response) { + responseQueue.add(response); + return responseQueue; + } + + public void addCheckedHeaders(final Map<String, String> headers) { + checkedHeaders.putAll(headers); + } + + public void addCheckedParameters(final Map<String, List<String>> parameters) { + checkedParameters.putAll(parameters); + } + + private void handleRequest(final Socket conn) { + try { + + final HttpRequest httpRequest = buildRequest(conn.getInputStream()); + + if (logger.isDebugEnabled()) { + logger.debug("\n" + httpRequest); + } + + // check headers + final Map<String, String> reqHeaders = httpRequest.getHeaders(); + for (final Map.Entry<String, String> entry : checkedHeaders.entrySet()) { + if (reqHeaders.containsKey(entry.getKey())) { + if (entry.getValue().equals(reqHeaders.get(entry.getKey()))) { + logger.error("Incorrect HTTP request header value received for checked header: " + entry.getKey()); + conn.close(); + return; + } + } else { + logger.error("Missing checked header: " + entry.getKey()); + conn.close(); + return; + } + } + + // check parameters + final Map<String, List<String>> reqParams = httpRequest.getParameters(); + for (final Map.Entry<String, List<String>> entry : checkedParameters.entrySet()) { + if (reqParams.containsKey(entry.getKey())) { + if (entry.getValue().equals(reqParams.get(entry.getKey())) == false) { + logger.error("Incorrect HTTP request parameter values received for checked parameter: " + entry.getKey()); + conn.close(); + return; + } + } else { + logger.error("Missing checked parameter: " + entry.getKey()); + conn.close(); + return; + } + } + + // apply the next response + final HttpResponseAction response = responseQueue.remove(); + response.apply(); + + // send the response to client + final PrintWriter pw = new PrintWriter(conn.getOutputStream(), true); + + if (logger.isDebugEnabled()) { + logger.debug("\n" + response.getResponse()); + } + + pw.print(response.getResponse()); + pw.flush(); + + } catch (IOException ioe) { /* ignored */ } + } + + private HttpRequest buildRequest(final InputStream requestIs) throws IOException { + return new HttpRequestReader().read(new InputStreamReader(requestIs)); + } + + // reads an HTTP request from the given reader + private class HttpRequestReader { + + public HttpRequest read(final Reader reader) throws IOException { + + HttpRequestBuilder builder = null; + String line = ""; + boolean isRequestLine = true; + while ((line = readLine(reader)).isEmpty() == false) { + if (isRequestLine) { + builder = HttpRequest.createFromRequestLine(line); + isRequestLine = false; + } else { + builder.addHeader(line); + } + } + + if (builder != null) { + builder.addBody(reader); + } + + return builder.build(); + } + + private String readLine(final Reader reader) throws IOException { + + /* read character at time to prevent blocking */ + final StringBuilder strb = new StringBuilder(); + char c; + while ((c = (char) reader.read()) != '\n') { + if (c != '\r') { + strb.append(c); + } + } + return strb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java new file mode 100644 index 0000000..96943c2 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetAddress; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +public class ClusterManagerProtocolSenderImplTest { + + private InetAddress address; + + private int port; + + private SocketProtocolListener listener; + + private ClusterManagerProtocolSenderImpl sender; + + private ProtocolHandler mockHandler; + + @Before + public void setup() throws IOException { + + address = InetAddress.getLocalHost(); + ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); + + mockHandler = mock(ProtocolHandler.class); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.addHandler(mockHandler); + listener.start(); + + port = listener.getPort(); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if (listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testRequestFlow() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + FlowResponseMessage response = sender.requestFlow(request); + assertNotNull(response); + } + + @Test + public void testRequestFlowWithBadResponseMessage() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch (ProtocolException pe) { + } + + } + + @Test + public void testRequestFlowDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { + @Override + public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new FlowResponseMessage(); + } + }); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch (ProtocolException pe) { + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java new file mode 100644 index 0000000..4a69571 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.*; +import org.mockito.stubbing.OngoingStubbing; + +public class ClusterServiceLocatorTest { + + private ClusterServiceDiscovery mockServiceDiscovery; + + private int fixedPort; + + private DiscoverableService fixedService; + + private ClusterServiceLocator serviceDiscoveryLocator; + + private ClusterServiceLocator serviceDiscoveryFixedPortLocator; + + private ClusterServiceLocator fixedServiceLocator; + + @Before + public void setup() throws Exception { + + fixedPort = 1; + mockServiceDiscovery = mock(ClusterServiceDiscovery.class); + fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20)); + + serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery); + serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort); + fixedServiceLocator = new ClusterServiceLocator(fixedService); + + } + + @Test + public void getServiceWhenServiceDiscoveryNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenServiceDiscoveryFixedPortNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenFixedServiceNotStarted() { + assertEquals(fixedService, fixedServiceLocator.getService()); + } + + @Test + public void getServiceNotOnFirstAttempt() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for (int i = 0; i < config.getNumAttempts() - 1; i++) { + if (stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + assertEquals(fixedService, serviceDiscoveryLocator.getService()); + + } + + @Test + public void getServiceNotOnFirstAttemptWithFixedPort() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryFixedPortLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for (int i = 0; i < config.getNumAttempts() - 1; i++) { + if (stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort); + DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress); + assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java new file mode 100644 index 0000000..4d85d1a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.net.InetSocketAddress; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class ClusterServicesBroadcasterTest { + + private ClusterServicesBroadcaster broadcaster; + + private MulticastProtocolListener listener; + + private DummyProtocolHandler handler; + + private InetSocketAddress multicastAddress; + + private DiscoverableService broadcastedService; + + private ProtocolContext protocolContext; + + private MulticastConfiguration configuration; + + @Before + public void setup() throws Exception { + + broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111)); + + multicastAddress = new InetSocketAddress("225.1.1.1", 22222); + + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms"); + broadcaster.addService(broadcastedService); + + handler = new DummyProtocolHandler(); + listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext); + listener.addHandler(handler); + } + + @After + public void teardown() { + + if (broadcaster.isRunning()) { + broadcaster.stop(); + } + + try { + if (listener.isRunning()) { + listener.stop(); + } + } catch (Exception ex) { + ex.printStackTrace(System.out); + } + + } + + @Test + @Ignore + public void testBroadcastReceived() throws Exception { + + broadcaster.start(); + listener.start(); + + Thread.sleep(1000); + + listener.stop(); + + assertNotNull(handler.getProtocolMessage()); + assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType()); + final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage(); + assertEquals(broadcastedService.getServiceName(), msg.getServiceName()); + assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress()); + assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort()); + } + + private class DummyProtocolHandler implements ProtocolHandler { + + private ProtocolMessage protocolMessage; + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + this.protocolMessage = msg; + return null; + } + + public ProtocolMessage getProtocolMessage() { + return protocolMessage; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java new file mode 100644 index 0000000..acd21e8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class MulticastProtocolListenerTest { + + private MulticastProtocolListener listener; + + private MulticastSocket socket; + + private InetSocketAddress address; + + private MulticastConfiguration configuration; + + private ProtocolContext protocolContext; + + @Before + public void setup() throws Exception { + + address = new InetSocketAddress("226.1.1.1", 60000); + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new MulticastProtocolListener(5, address, configuration, protocolContext); + listener.start(); + + socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration); + } + + @After + public void teardown() throws IOException { + try { + if (listener.isRunning()) { + listener.stop(); + } + } finally { + MulticastUtils.closeQuietly(socket); + } + } + + @Test + public void testBadRequest() throws Exception { + DelayedProtocolHandler handler = new DelayedProtocolHandler(0); + listener.addHandler(handler); + DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address); + socket.send(packet); + Thread.sleep(250); + assertEquals(0, handler.getMessages().size()); + } + + @Test + @Ignore + public void testRequest() throws Exception { + + ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); + listener.addHandler(handler); + + ProtocolMessage msg = new PingMessage(); + MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg); + + // marshal message to output stream + ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(multicastMsg, baos); + byte[] requestPacketBytes = baos.toByteArray(); + DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address); + socket.send(packet); + + Thread.sleep(250); + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + + } + + private class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } + + private class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch (final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java new file mode 100644 index 0000000..7c62d2f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.UUID; + +import org.apache.nifi.cluster.HeartbeatPayload; +import org.apache.nifi.cluster.protocol.ConnectionRequest; +import org.apache.nifi.cluster.protocol.ConnectionResponse; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +public class NodeProtocolSenderImplTest { + + private SocketProtocolListener listener; + + private NodeProtocolSenderImpl sender; + + private DiscoverableService service; + + private ServerSocketConfiguration serverSocketConfiguration; + + private ClusterServiceLocator mockServiceLocator; + + private ProtocolHandler mockHandler; + + private NodeIdentifier nodeIdentifier; + + @Before + public void setup() throws IOException { + + serverSocketConfiguration = new ServerSocketConfiguration(); + + mockServiceLocator = mock(ClusterServiceLocator.class); + mockHandler = mock(ProtocolHandler.class); + + nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.setShutdownListenerSeconds(3); + listener.addHandler(mockHandler); + listener.start(); + + service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort())); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + socketConfiguration.setReuseAddress(true); + sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if (listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testConnect() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); + mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + ConnectionResponseMessage response = sender.requestConnection(request); + assertNotNull(response); + } + + @Test(expected = UnknownServiceAddressException.class) + public void testConnectNoClusterManagerAddress() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(null); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + } + + @Test(expected = ProtocolException.class) + public void testConnectBadResponse() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test(expected = ProtocolException.class) + public void testConnectDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() { + @Override + public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new ConnectionResponseMessage(); + } + }); + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test + public void testHeartbeat() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + HeartbeatMessage msg = new HeartbeatMessage(); + HeartbeatPayload hbPayload = new HeartbeatPayload(); + Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, hbPayload.marshal()); + msg.setHeartbeat(hb); + sender.heartbeat(msg); + } + + @Test + public void testNotifyControllerStartupFailure() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); + msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); + msg.setExceptionMessage("some exception"); + sender.notifyControllerStartupFailure(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java new file mode 100644 index 0000000..92a7d2a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler; +import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.SocketUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; + +/** + * @author unattributed + */ +public class SocketProtocolListenerTest { + + private SocketProtocolListener listener; + + private Socket socket; + + private ProtocolMessageMarshaller<ProtocolMessage> marshaller; + + private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller; + + @Before + public void setup() throws Exception { + + final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + marshaller = protocolContext.createMarshaller(); + unmarshaller = protocolContext.createUnmarshaller(); + + ServerSocketConfiguration configuration = new ServerSocketConfiguration(); + configuration.setSocketTimeout(1000); + + listener = new SocketProtocolListener(5, 0, configuration, protocolContext); + listener.start(); + + int port = listener.getPort(); + + SocketConfiguration config = new SocketConfiguration(); + config.setReuseAddress(true); + config.setSocketTimeout(1000); + socket = SocketUtils.createSocket(new InetSocketAddress("localhost", port), config); + } + + @After + public void teardown() throws IOException { + try { + if (listener.isRunning()) { + listener.stop(); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Test + public void testBadRequest() throws Exception { + DelayedProtocolHandler handler = new DelayedProtocolHandler(0); + listener.addHandler(handler); + socket.getOutputStream().write(5); + Thread.sleep(250); + assertEquals(0, handler.getMessages().size()); + } + + @Test + public void testRequest() throws Exception { + ProtocolMessage msg = new PingMessage(); + + ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); + listener.addHandler(handler); + + // marshal message to output stream + marshaller.marshal(msg, socket.getOutputStream()); + + // unmarshall response and return + ProtocolMessage response = unmarshaller.unmarshal(socket.getInputStream()); + assertEquals(msg.getType(), response.getType()); + + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + } + + @Test + public void testDelayedRequest() throws Exception { + ProtocolMessage msg = new PingMessage(); + + DelayedProtocolHandler handler = new DelayedProtocolHandler(2000); + listener.addHandler(handler); + + // marshal message to output stream + marshaller.marshal(msg, socket.getOutputStream()); + + try { + socket.getInputStream().read(); + fail("Socket timeout not received."); + } catch (SocketTimeoutException ste) { + } + + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java new file mode 100644 index 0000000..2f16777 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch (final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java new file mode 100644 index 0000000..e80f52c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml new file mode 100644 index 0000000..92eb78c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="INFO"/> + + <!-- Logger for managing logging statements for nifi clusters. --> + <logger name="org.apache.nifi.cluster" level="INFO"/> + + <!-- + Logger for logging HTTP requests received by the web server. Setting + log level to 'debug' activates HTTP request logging. + --> + <logger name="org.apache.nifi.server.JettyServer" level="INFO"/> + + <!-- Logger for managing logging statements for jetty --> + <logger name="org.mortbay" level="INFO"/> + + <!-- Suppress non-error messages due to excessive logging by class --> + <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/> + + <logger name="org.apache.nifi.processors.standard" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt new file mode 100755 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt new file mode 100755 index 0000000..e8e4c2b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt @@ -0,0 +1,12 @@ + +bad data should be skipped + +# this is a comment + 2.2.2.2 # this is another comment #### +3.3.3.3/8 + +4.4.4.4/24 + +5.5.5.255/31 + +more bad data \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/.gitignore b/nar-bundles/framework-bundle/framework/core-api/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/pom.xml b/nar-bundles/framework-bundle/framework/core-api/pom.xml new file mode 100644 index 0000000..b163cd0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>core-api</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>NiFi Core API</name> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>client-dto</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java new file mode 100644 index 0000000..0092f7a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +public class AdaptedNodeInformation { + + private String hostname; + private Integer siteToSitePort; + private int apiPort; + private boolean isSiteToSiteSecure; + private int totalFlowFiles; + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(Integer siteToSitePort) { + this.siteToSitePort = siteToSitePort; + } + + public int getApiPort() { + return apiPort; + } + + public void setApiPort(int apiPort) { + this.apiPort = apiPort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public void setSiteToSiteSecure(boolean isSiteToSiteSecure) { + this.isSiteToSiteSecure = isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + public void setTotalFlowFiles(int totalFlowFiles) { + this.totalFlowFiles = totalFlowFiles; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java new file mode 100644 index 0000000..5751c32 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +@XmlRootElement +public class ClusterNodeInformation { + + private Collection<NodeInformation> nodeInfo; + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + + public ClusterNodeInformation() { + this.nodeInfo = null; + } + + public void setNodeInformation(final Collection<NodeInformation> nodeInfo) { + this.nodeInfo = nodeInfo; + } + + @XmlJavaTypeAdapter(NodeInformationAdapter.class) + public Collection<NodeInformation> getNodeInformation() { + return nodeInfo; + } + + public void marshal(final OutputStream os) throws JAXBException { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(this, os); + } + + public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (ClusterNodeInformation) unmarshaller.unmarshal(is); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java new file mode 100644 index 0000000..987ff65 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +public interface NodeInformant { + + ClusterNodeInformation getNodeInformation(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java new file mode 100644 index 0000000..848eb7e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +public class NodeInformation { + + private final String hostname; + private final Integer siteToSitePort; + private final int apiPort; + private final boolean isSiteToSiteSecure; + private final int totalFlowFiles; + + public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, + final boolean isSiteToSiteSecure, final int totalFlowFiles) { + this.hostname = hostname; + this.siteToSitePort = siteToSitePort; + this.apiPort = apiPort; + this.isSiteToSiteSecure = isSiteToSiteSecure; + this.totalFlowFiles = totalFlowFiles; + } + + public String getHostname() { + return hostname; + } + + public int getAPIPort() { + return apiPort; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof NodeInformation)) { + return false; + } + + final NodeInformation other = (NodeInformation) obj; + if (!hostname.equals(other.hostname)) { + return false; + } + if (siteToSitePort == null && other.siteToSitePort != null) { + return false; + } + if (siteToSitePort != null && other.siteToSitePort == null) { + return false; + } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { + return false; + } + if (apiPort != other.apiPort) { + return false; + } + if (isSiteToSiteSecure != other.isSiteToSiteSecure) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); + } + + @Override + public String toString() { + return "Node[" + hostname + ":" + apiPort + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java new file mode 100644 index 0000000..630631f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +import javax.xml.bind.annotation.adapters.XmlAdapter; + +public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> { + + @Override + public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { + return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); + } + + @Override + public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { + final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); + adapted.setHostname(nodeInformation.getHostname()); + adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); + adapted.setApiPort(nodeInformation.getAPIPort()); + adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); + adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); + return adapted; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java new file mode 100644 index 0000000..57c1c30 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +public interface DataFlow { + + /** + * @return the raw byte array of the flow + */ + public byte[] getFlow(); + + /** + * @return the raw byte array of the templates + */ + public byte[] getTemplates(); + + /** + * @return the raw byte array of the snippets + */ + public byte[] getSnippets(); + + /** + * @return true if processors should be automatically started at application + * startup; false otherwise + */ + public boolean isAutoStartProcessors(); +}