http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java deleted file mode 100644 index 35380dd..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.testutils; - -import java.io.IOException; -import java.io.Reader; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import org.apache.commons.lang3.StringUtils; - -/** - * Encapsulates an HTTP request. The toString method returns the - * specification-compliant request. - * - * @author unattributed - */ -public class HttpRequest { - - private String method; - private String uri; - private String rawUri; - private String version; - private String body; - private String rawRequest; - private Map<String, String> headers = new HashMap<>(); - private Map<String, List<String>> parameters = new HashMap<>(); - - public static HttpRequestBuilder createFromRequestLine(final String requestLine) { - return new HttpRequestBuilder(requestLine); - } - - public String getBody() { - return body; - } - - public Map<String, String> getHeaders() { - return Collections.unmodifiableMap(headers); - } - - public String getHeaderValue(final String header) { - for (final Map.Entry<String, String> entry : getHeaders().entrySet()) { - if (entry.getKey().equalsIgnoreCase(header)) { - return entry.getValue(); - } - } - return null; - } - - public String getMethod() { - return method; - } - - public Map<String, List<String>> getParameters() { - final Map<String, List<String>> result = new HashMap<>(); - for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) { - result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); - } - return Collections.unmodifiableMap(result); - } - - public String getUri() { - return uri; - } - - public String getRawUri() { - return rawUri; - } - - public String getVersion() { - return version; - } - - @Override - public String toString() { - return rawRequest; - } - - /** - * A builder for constructing basic HTTP requests. It handles only enough of - * the HTTP specification to support basic unit testing, and it should not - * be used otherwise. - */ - public static class HttpRequestBuilder { - - private String method; - private String uri; - private String rawUri; - private String version; - private Map<String, String> headers = new HashMap<>(); - private Map<String, List<String>> parameters = new HashMap<>(); - private int contentLength = 0; - private String contentType; - private String body = ""; - private StringBuilder rawRequest = new StringBuilder(); - - private HttpRequestBuilder(final String requestLine) { - - final String[] tokens = requestLine.split(" "); - if (tokens.length != 3) { - throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine); - } - - method = tokens[0]; - if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) { - final int queryIndex = tokens[1].indexOf("?"); - if (queryIndex > -1) { - uri = tokens[1].substring(0, queryIndex); - addParameters(tokens[1].substring(queryIndex + 1)); - } else { - uri = tokens[1]; - } - } - rawUri = tokens[1]; - version = tokens[2]; - rawRequest.append(requestLine).append("\n"); - } - - private void addHeader(final String key, final String value) { - if (key.contains(" ")) { - throw new IllegalArgumentException("Header key may not contain spaces."); - } else if ("content-length".equalsIgnoreCase(key)) { - contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim()); - } else if ("content-type".equalsIgnoreCase(key)) { - contentType = value.trim(); - } - headers.put(key, value); - } - - public void addHeader(final String header) { - final int firstColonIndex = header.indexOf(":"); - if (firstColonIndex < 0) { - throw new IllegalArgumentException("Invalid HTTP Header line: " + header); - } - addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1)); - rawRequest.append(header).append("\n"); - } - - // final because constructor calls it - public final void addParameters(final String queryString) { - - if (StringUtils.isBlank(queryString)) { - return; - } - - final String normQueryString; - if (queryString.startsWith("?")) { - normQueryString = queryString.substring(1); - } else { - normQueryString = queryString; - } - final String[] keyValuePairs = normQueryString.split("&"); - for (final String keyValuePair : keyValuePairs) { - final String[] keyValueTokens = keyValuePair.split("="); - try { - addParameter( - URLDecoder.decode(keyValueTokens[0], "utf-8"), - URLDecoder.decode(keyValueTokens[1], "utf-8") - ); - } catch (UnsupportedEncodingException use) { - throw new RuntimeException(use); - } - } - } - - public void addParameter(final String key, final String value) { - - if (key.contains(" ")) { - throw new IllegalArgumentException("Parameter key may not contain spaces: " + key); - } - - final List<String> values; - if (parameters.containsKey(key)) { - values = parameters.get(key); - } else { - values = new ArrayList<>(); - parameters.put(key, values); - } - values.add(value); - } - - public void addBody(final Reader reader) throws IOException { - - if (contentLength <= 0) { - return; - } - - final char[] buf = new char[contentLength]; - int offset = 0; - int numRead = 0; - while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) { - offset += numRead; - } - body = new String(buf); - rawRequest.append("\n"); - rawRequest.append(body); - } - - public HttpRequest build() throws UnsupportedEncodingException { - - if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) { - addParameters(body); - } - - final HttpRequest request = new HttpRequest(); - request.method = this.method; - request.uri = this.uri; - request.rawUri = this.rawUri; - request.version = this.version; - request.headers.putAll(this.headers); - request.parameters.putAll(this.parameters); - request.body = this.body; - request.rawRequest = this.rawRequest.toString(); - - return request; - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java deleted file mode 100644 index 3aa2931..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.testutils; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import javax.ws.rs.core.Response.Status; - -/** - * Encapsulates an HTTP response. The toString method returns the - * specification-compliant response. - * - * @author unattributed - */ -public class HttpResponse { - - private final Status status; - private final String entity; - private final Map<String, String> headers = new HashMap<>(); - - public HttpResponse(final Status status, final String entity) { - this.status = status; - this.entity = entity; - headers.put("content-length", String.valueOf(entity.getBytes().length)); - } - - public String getEntity() { - return entity; - } - - public Status getStatus() { - return status; - } - - public Map<String, String> getHeaders() { - return Collections.unmodifiableMap(headers); - } - - public void addHeader(final String key, final String value) { - if (key.contains(" ")) { - throw new IllegalArgumentException("Header key may not contain spaces."); - } else if ("content-length".equalsIgnoreCase(key)) { - throw new IllegalArgumentException("Content-Length header is set automatically based on length of content."); - } - headers.put(key, value); - } - - public void addHeaders(final Map<String, String> headers) { - for (final Map.Entry<String, String> entry : headers.entrySet()) { - addHeader(entry.getKey(), entry.getValue()); - } - } - - @Override - public String toString() { - - final StringBuilder strb = new StringBuilder(); - - // response line - strb.append("HTTP/1.1 ") - .append(status.getStatusCode()) - .append(" ") - .append(status.getReasonPhrase()) - .append("\n"); - - // headers - for (final Map.Entry<String, String> entry : headers.entrySet()) { - strb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); - } - - strb.append("\n"); - - // body - strb.append(entity); - - return strb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java deleted file mode 100644 index 28615d0..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.testutils; - -/** - * Wraps a HttpResponse with a time-delay. When the action is applied, the - * currently executing thread sleeps for the given delay before returning the - * response to the caller. - * - * This class is good for simulating network latency. - * - * @author unattributed - */ -public class HttpResponseAction { - - private final HttpResponse response; - - private final int waitTimeMs; - - public HttpResponseAction(final HttpResponse response) { - this(response, 0); - } - - public HttpResponseAction(final HttpResponse response, final int waitTimeMs) { - this.response = response; - this.waitTimeMs = waitTimeMs; - } - - public HttpResponse apply() { - try { - Thread.sleep(waitTimeMs); - } catch (final InterruptedException ie) { - throw new RuntimeException(ie); - } - - return response; - } - - public HttpResponse getResponse() { - return response; - } - - public int getWaitTimeMs() { - return waitTimeMs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java deleted file mode 100644 index f17a66c..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.testutils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.Reader; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A simple HTTP web server that allows clients to register canned-responses to - * respond to received requests. - * - * @author unattributed - */ -public class HttpServer { - - private static final Logger logger = LoggerFactory.getLogger(HttpServer.class); - - private final ExecutorService executorService; - private final ServerSocket serverSocket; - private final Queue<HttpResponseAction> responseQueue = new ConcurrentLinkedQueue<>(); - private final Map<String, String> checkedHeaders = new HashMap<>(); - private final Map<String, List<String>> checkedParameters = new HashMap<>(); - private final int port; - - public HttpServer(int numThreads, int port) throws IOException { - this.port = port; - executorService = Executors.newFixedThreadPool(numThreads); - serverSocket = new ServerSocket(port); - } - - public void start() { - - new Thread() { - @Override - public void run() { - while (isRunning()) { - try { - final Socket conn = serverSocket.accept(); - executorService.execute(new Runnable() { - @Override - public void run() { - handleRequest(conn); - if (conn.isClosed() == false) { - try { - conn.close(); - } catch (IOException ioe) { - } - } - } - }); - } catch (final SocketException se) { - /* ignored */ - } catch (final IOException ioe) { - if (logger.isDebugEnabled()) { - logger.warn("", ioe); - } - } - } - } - ; - } - - .start(); - } - - public boolean isRunning() { - return executorService.isShutdown() == false; - } - - public void stop() { - // shutdown server socket - try { - if (serverSocket.isClosed() == false) { - serverSocket.close(); - } - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - - // shutdown executor service - try { - executorService.shutdown(); - executorService.awaitTermination(3, TimeUnit.SECONDS); - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - } - - public int getPort() { - if (isRunning()) { - return serverSocket.getLocalPort(); - } else { - return port; - } - } - - public Queue<HttpResponseAction> addResponseAction(final HttpResponseAction response) { - responseQueue.add(response); - return responseQueue; - } - - public void addCheckedHeaders(final Map<String, String> headers) { - checkedHeaders.putAll(headers); - } - - public void addCheckedParameters(final Map<String, List<String>> parameters) { - checkedParameters.putAll(parameters); - } - - private void handleRequest(final Socket conn) { - try { - - final HttpRequest httpRequest = buildRequest(conn.getInputStream()); - - if (logger.isDebugEnabled()) { - logger.debug("\n" + httpRequest); - } - - // check headers - final Map<String, String> reqHeaders = httpRequest.getHeaders(); - for (final Map.Entry<String, String> entry : checkedHeaders.entrySet()) { - if (reqHeaders.containsKey(entry.getKey())) { - if (entry.getValue().equals(reqHeaders.get(entry.getKey()))) { - logger.error("Incorrect HTTP request header value received for checked header: " + entry.getKey()); - conn.close(); - return; - } - } else { - logger.error("Missing checked header: " + entry.getKey()); - conn.close(); - return; - } - } - - // check parameters - final Map<String, List<String>> reqParams = httpRequest.getParameters(); - for (final Map.Entry<String, List<String>> entry : checkedParameters.entrySet()) { - if (reqParams.containsKey(entry.getKey())) { - if (entry.getValue().equals(reqParams.get(entry.getKey())) == false) { - logger.error("Incorrect HTTP request parameter values received for checked parameter: " + entry.getKey()); - conn.close(); - return; - } - } else { - logger.error("Missing checked parameter: " + entry.getKey()); - conn.close(); - return; - } - } - - // apply the next response - final HttpResponseAction response = responseQueue.remove(); - response.apply(); - - // send the response to client - final PrintWriter pw = new PrintWriter(conn.getOutputStream(), true); - - if (logger.isDebugEnabled()) { - logger.debug("\n" + response.getResponse()); - } - - pw.print(response.getResponse()); - pw.flush(); - - } catch (IOException ioe) { /* ignored */ } - } - - private HttpRequest buildRequest(final InputStream requestIs) throws IOException { - return new HttpRequestReader().read(new InputStreamReader(requestIs)); - } - - // reads an HTTP request from the given reader - private class HttpRequestReader { - - public HttpRequest read(final Reader reader) throws IOException { - - HttpRequestBuilder builder = null; - String line = ""; - boolean isRequestLine = true; - while ((line = readLine(reader)).isEmpty() == false) { - if (isRequestLine) { - builder = HttpRequest.createFromRequestLine(line); - isRequestLine = false; - } else { - builder.addHeader(line); - } - } - - if (builder != null) { - builder.addBody(reader); - } - - return builder.build(); - } - - private String readLine(final Reader reader) throws IOException { - - /* read character at time to prevent blocking */ - final StringBuilder strb = new StringBuilder(); - char c; - while ((c = (char) reader.read()) != '\n') { - if (c != '\r') { - strb.append(c); - } - } - return strb.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java deleted file mode 100644 index 96943c2..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetAddress; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * @author unattributed - */ -public class ClusterManagerProtocolSenderImplTest { - - private InetAddress address; - - private int port; - - private SocketProtocolListener listener; - - private ClusterManagerProtocolSenderImpl sender; - - private ProtocolHandler mockHandler; - - @Before - public void setup() throws IOException { - - address = InetAddress.getLocalHost(); - ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); - - mockHandler = mock(ProtocolHandler.class); - - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); - listener.addHandler(mockHandler); - listener.start(); - - port = listener.getPort(); - - SocketConfiguration socketConfiguration = new SocketConfiguration(); - sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); - } - - @After - public void teardown() throws IOException { - if (listener.isRunning()) { - listener.stop(); - } - } - - @Test - public void testRequestFlow() throws Exception { - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - FlowResponseMessage response = sender.requestFlow(request); - assertNotNull(response); - } - - @Test - public void testRequestFlowWithBadResponseMessage() throws Exception { - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - try { - sender.requestFlow(request); - fail("failed to throw exception"); - } catch (ProtocolException pe) { - } - - } - - @Test - public void testRequestFlowDelayedResponse() throws Exception { - - final int time = 250; - sender.getSocketConfiguration().setSocketTimeout(time); - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { - @Override - public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(time * 3); - return new FlowResponseMessage(); - } - }); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - try { - sender.requestFlow(request); - fail("failed to throw exception"); - } catch (ProtocolException pe) { - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java deleted file mode 100644 index 4a69571..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; -import static org.mockito.Mockito.*; -import org.mockito.stubbing.OngoingStubbing; - -public class ClusterServiceLocatorTest { - - private ClusterServiceDiscovery mockServiceDiscovery; - - private int fixedPort; - - private DiscoverableService fixedService; - - private ClusterServiceLocator serviceDiscoveryLocator; - - private ClusterServiceLocator serviceDiscoveryFixedPortLocator; - - private ClusterServiceLocator fixedServiceLocator; - - @Before - public void setup() throws Exception { - - fixedPort = 1; - mockServiceDiscovery = mock(ClusterServiceDiscovery.class); - fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20)); - - serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery); - serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort); - fixedServiceLocator = new ClusterServiceLocator(fixedService); - - } - - @Test - public void getServiceWhenServiceDiscoveryNotStarted() { - assertNull(serviceDiscoveryLocator.getService()); - } - - @Test - public void getServiceWhenServiceDiscoveryFixedPortNotStarted() { - assertNull(serviceDiscoveryLocator.getService()); - } - - @Test - public void getServiceWhenFixedServiceNotStarted() { - assertEquals(fixedService, fixedServiceLocator.getService()); - } - - @Test - public void getServiceNotOnFirstAttempt() { - - ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); - config.setNumAttempts(2); - config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); - config.setTimeBetweenAttempts(1); - - serviceDiscoveryLocator.setAttemptsConfig(config); - - OngoingStubbing<DiscoverableService> stubbing = null; - for (int i = 0; i < config.getNumAttempts() - 1; i++) { - if (stubbing == null) { - stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); - } else { - stubbing.thenReturn(null); - } - } - stubbing.thenReturn(fixedService); - - assertEquals(fixedService, serviceDiscoveryLocator.getService()); - - } - - @Test - public void getServiceNotOnFirstAttemptWithFixedPort() { - - ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); - config.setNumAttempts(2); - config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); - config.setTimeBetweenAttempts(1); - - serviceDiscoveryFixedPortLocator.setAttemptsConfig(config); - - OngoingStubbing<DiscoverableService> stubbing = null; - for (int i = 0; i < config.getNumAttempts() - 1; i++) { - if (stubbing == null) { - stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); - } else { - stubbing.thenReturn(null); - } - } - stubbing.thenReturn(fixedService); - - InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort); - DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress); - assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java deleted file mode 100644 index 4d85d1a..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.net.InetSocketAddress; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * @author unattributed - */ -public class ClusterServicesBroadcasterTest { - - private ClusterServicesBroadcaster broadcaster; - - private MulticastProtocolListener listener; - - private DummyProtocolHandler handler; - - private InetSocketAddress multicastAddress; - - private DiscoverableService broadcastedService; - - private ProtocolContext protocolContext; - - private MulticastConfiguration configuration; - - @Before - public void setup() throws Exception { - - broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111)); - - multicastAddress = new InetSocketAddress("225.1.1.1", 22222); - - configuration = new MulticastConfiguration(); - - protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms"); - broadcaster.addService(broadcastedService); - - handler = new DummyProtocolHandler(); - listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext); - listener.addHandler(handler); - } - - @After - public void teardown() { - - if (broadcaster.isRunning()) { - broadcaster.stop(); - } - - try { - if (listener.isRunning()) { - listener.stop(); - } - } catch (Exception ex) { - ex.printStackTrace(System.out); - } - - } - - @Test - @Ignore - public void testBroadcastReceived() throws Exception { - - broadcaster.start(); - listener.start(); - - Thread.sleep(1000); - - listener.stop(); - - assertNotNull(handler.getProtocolMessage()); - assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType()); - final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage(); - assertEquals(broadcastedService.getServiceName(), msg.getServiceName()); - assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress()); - assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort()); - } - - private class DummyProtocolHandler implements ProtocolHandler { - - private ProtocolMessage protocolMessage; - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - this.protocolMessage = msg; - return null; - } - - public ProtocolMessage getProtocolMessage() { - return protocolMessage; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java deleted file mode 100644 index 6c79b90..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * @author unattributed - */ -public class MulticastProtocolListenerTest { - - private MulticastProtocolListener listener; - - private MulticastSocket socket; - - private InetSocketAddress address; - - private MulticastConfiguration configuration; - - private ProtocolContext protocolContext; - - @Before - public void setup() throws Exception { - - address = new InetSocketAddress("226.1.1.1", 60000); - configuration = new MulticastConfiguration(); - - protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new MulticastProtocolListener(5, address, configuration, protocolContext); - listener.start(); - - socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration); - } - - @After - public void teardown() throws IOException { - try { - if (listener.isRunning()) { - listener.stop(); - } - } finally { - MulticastUtils.closeQuietly(socket); - } - } - - @Ignore("This test must be reworked. Requires an active network connection") - @Test - public void testBadRequest() throws Exception { - DelayedProtocolHandler handler = new DelayedProtocolHandler(0); - listener.addHandler(handler); - DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address); - socket.send(packet); - Thread.sleep(250); - assertEquals(0, handler.getMessages().size()); - } - - @Test - @Ignore - public void testRequest() throws Exception { - - ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); - listener.addHandler(handler); - - ProtocolMessage msg = new PingMessage(); - MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg); - - // marshal message to output stream - ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(multicastMsg, baos); - byte[] requestPacketBytes = baos.toByteArray(); - DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address); - socket.send(packet); - - Thread.sleep(250); - assertEquals(1, handler.getMessages().size()); - assertEquals(msg.getType(), handler.getMessages().get(0).getType()); - - } - - private class ReflexiveProtocolHandler implements ProtocolHandler { - - private List<ProtocolMessage> messages = new ArrayList<>(); - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - messages.add(msg); - return msg; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - - } - - private class DelayedProtocolHandler implements ProtocolHandler { - - private int delay = 0; - - private List<ProtocolMessage> messages = new ArrayList<>(); - - public DelayedProtocolHandler(int delay) { - this.delay = delay; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - try { - messages.add(msg); - Thread.sleep(delay); - return null; - } catch (final InterruptedException ie) { - throw new ProtocolException(ie); - } - - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java deleted file mode 100644 index 7c62d2f..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.UUID; - -import org.apache.nifi.cluster.HeartbeatPayload; -import org.apache.nifi.cluster.protocol.ConnectionRequest; -import org.apache.nifi.cluster.protocol.ConnectionResponse; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * @author unattributed - */ -public class NodeProtocolSenderImplTest { - - private SocketProtocolListener listener; - - private NodeProtocolSenderImpl sender; - - private DiscoverableService service; - - private ServerSocketConfiguration serverSocketConfiguration; - - private ClusterServiceLocator mockServiceLocator; - - private ProtocolHandler mockHandler; - - private NodeIdentifier nodeIdentifier; - - @Before - public void setup() throws IOException { - - serverSocketConfiguration = new ServerSocketConfiguration(); - - mockServiceLocator = mock(ClusterServiceLocator.class); - mockHandler = mock(ProtocolHandler.class); - - nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); - - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); - listener.setShutdownListenerSeconds(3); - listener.addHandler(mockHandler); - listener.start(); - - service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort())); - - SocketConfiguration socketConfiguration = new SocketConfiguration(); - socketConfiguration.setReuseAddress(true); - sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext); - } - - @After - public void teardown() throws IOException { - if (listener.isRunning()) { - listener.stop(); - } - } - - @Test - public void testConnect() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); - mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - ConnectionResponseMessage response = sender.requestConnection(request); - assertNotNull(response); - } - - @Test(expected = UnknownServiceAddressException.class) - public void testConnectNoClusterManagerAddress() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(null); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage()); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - } - - @Test(expected = ProtocolException.class) - public void testConnectBadResponse() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - - } - - @Test(expected = ProtocolException.class) - public void testConnectDelayedResponse() throws Exception { - - final int time = 250; - sender.getSocketConfiguration().setSocketTimeout(time); - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() { - @Override - public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(time * 3); - return new ConnectionResponseMessage(); - } - }); - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - - } - - @Test - public void testHeartbeat() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); - - HeartbeatMessage msg = new HeartbeatMessage(); - HeartbeatPayload hbPayload = new HeartbeatPayload(); - Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, hbPayload.marshal()); - msg.setHeartbeat(hb); - sender.heartbeat(msg); - } - - @Test - public void testNotifyControllerStartupFailure() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); - - ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); - msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); - msg.setExceptionMessage("some exception"); - sender.notifyControllerStartupFailure(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java deleted file mode 100644 index 92a7d2a..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketTimeoutException; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler; -import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.SocketUtils; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; - -/** - * @author unattributed - */ -public class SocketProtocolListenerTest { - - private SocketProtocolListener listener; - - private Socket socket; - - private ProtocolMessageMarshaller<ProtocolMessage> marshaller; - - private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller; - - @Before - public void setup() throws Exception { - - final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - marshaller = protocolContext.createMarshaller(); - unmarshaller = protocolContext.createUnmarshaller(); - - ServerSocketConfiguration configuration = new ServerSocketConfiguration(); - configuration.setSocketTimeout(1000); - - listener = new SocketProtocolListener(5, 0, configuration, protocolContext); - listener.start(); - - int port = listener.getPort(); - - SocketConfiguration config = new SocketConfiguration(); - config.setReuseAddress(true); - config.setSocketTimeout(1000); - socket = SocketUtils.createSocket(new InetSocketAddress("localhost", port), config); - } - - @After - public void teardown() throws IOException { - try { - if (listener.isRunning()) { - listener.stop(); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - @Test - public void testBadRequest() throws Exception { - DelayedProtocolHandler handler = new DelayedProtocolHandler(0); - listener.addHandler(handler); - socket.getOutputStream().write(5); - Thread.sleep(250); - assertEquals(0, handler.getMessages().size()); - } - - @Test - public void testRequest() throws Exception { - ProtocolMessage msg = new PingMessage(); - - ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); - listener.addHandler(handler); - - // marshal message to output stream - marshaller.marshal(msg, socket.getOutputStream()); - - // unmarshall response and return - ProtocolMessage response = unmarshaller.unmarshal(socket.getInputStream()); - assertEquals(msg.getType(), response.getType()); - - assertEquals(1, handler.getMessages().size()); - assertEquals(msg.getType(), handler.getMessages().get(0).getType()); - } - - @Test - public void testDelayedRequest() throws Exception { - ProtocolMessage msg = new PingMessage(); - - DelayedProtocolHandler handler = new DelayedProtocolHandler(2000); - listener.addHandler(handler); - - // marshal message to output stream - marshaller.marshal(msg, socket.getOutputStream()); - - try { - socket.getInputStream().read(); - fail("Socket timeout not received."); - } catch (SocketTimeoutException ste) { - } - - assertEquals(1, handler.getMessages().size()); - assertEquals(msg.getType(), handler.getMessages().get(0).getType()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java deleted file mode 100644 index 2f16777..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.testutils; - -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * @author unattributed - */ -public class DelayedProtocolHandler implements ProtocolHandler { - - private int delay = 0; - private List<ProtocolMessage> messages = new ArrayList<>(); - - public DelayedProtocolHandler(int delay) { - this.delay = delay; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - try { - messages.add(msg); - Thread.sleep(delay); - return null; - } catch (final InterruptedException ie) { - throw new ProtocolException(ie); - } - - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java deleted file mode 100644 index e80f52c..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.testutils; - -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * @author unattributed - */ -public class ReflexiveProtocolHandler implements ProtocolHandler { - - private List<ProtocolMessage> messages = new ArrayList<>(); - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - messages.add(msg); - return msg; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml deleted file mode 100644 index 92eb78c..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml +++ /dev/null @@ -1,48 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<configuration scan="true" scanPeriod="30 seconds"> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> - <pattern>%-4r [%t] %-5p %c - %m%n</pattern> - </encoder> - </appender> - - <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> - <logger name="org.apache.nifi" level="INFO"/> - - <!-- Logger for managing logging statements for nifi clusters. --> - <logger name="org.apache.nifi.cluster" level="INFO"/> - - <!-- - Logger for logging HTTP requests received by the web server. Setting - log level to 'debug' activates HTTP request logging. - --> - <logger name="org.apache.nifi.server.JettyServer" level="INFO"/> - - <!-- Logger for managing logging statements for jetty --> - <logger name="org.mortbay" level="INFO"/> - - <!-- Suppress non-error messages due to excessive logging by class --> - <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/> - - <logger name="org.apache.nifi.processors.standard" level="DEBUG"/> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - </root> - -</configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt deleted file mode 100755 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt deleted file mode 100755 index e8e4c2b..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt +++ /dev/null @@ -1,12 +0,0 @@ - -bad data should be skipped - -# this is a comment - 2.2.2.2 # this is another comment #### -3.3.3.3/8 - -4.4.4.4/24 - -5.5.5.255/31 - -more bad data \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/.gitignore b/nar-bundles/framework-bundle/framework/core-api/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/pom.xml b/nar-bundles/framework-bundle/framework/core-api/pom.xml deleted file mode 100644 index 63645f3..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - <artifactId>core-api</artifactId> - <version>0.0.1-SNAPSHOT</version> - <name>NiFi Core API</name> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-runtime</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>client-dto</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - <dependency> - <groupId>org.quartz-scheduler</groupId> - <artifactId>quartz</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java deleted file mode 100644 index 0092f7a..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public class AdaptedNodeInformation { - - private String hostname; - private Integer siteToSitePort; - private int apiPort; - private boolean isSiteToSiteSecure; - private int totalFlowFiles; - - public String getHostname() { - return hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public void setSiteToSitePort(Integer siteToSitePort) { - this.siteToSitePort = siteToSitePort; - } - - public int getApiPort() { - return apiPort; - } - - public void setApiPort(int apiPort) { - this.apiPort = apiPort; - } - - public boolean isSiteToSiteSecure() { - return isSiteToSiteSecure; - } - - public void setSiteToSiteSecure(boolean isSiteToSiteSecure) { - this.isSiteToSiteSecure = isSiteToSiteSecure; - } - - public int getTotalFlowFiles() { - return totalFlowFiles; - } - - public void setTotalFlowFiles(int totalFlowFiles) { - this.totalFlowFiles = totalFlowFiles; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java deleted file mode 100644 index 5751c32..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -@XmlRootElement -public class ClusterNodeInformation { - - private Collection<NodeInformation> nodeInfo; - - private static final JAXBContext JAXB_CONTEXT; - - static { - try { - JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class); - } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext.", e); - } - } - - public ClusterNodeInformation() { - this.nodeInfo = null; - } - - public void setNodeInformation(final Collection<NodeInformation> nodeInfo) { - this.nodeInfo = nodeInfo; - } - - @XmlJavaTypeAdapter(NodeInformationAdapter.class) - public Collection<NodeInformation> getNodeInformation() { - return nodeInfo; - } - - public void marshal(final OutputStream os) throws JAXBException { - final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); - marshaller.marshal(this, os); - } - - public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (ClusterNodeInformation) unmarshaller.unmarshal(is); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java deleted file mode 100644 index 987ff65..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public interface NodeInformant { - - ClusterNodeInformation getNodeInformation(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java deleted file mode 100644 index 848eb7e..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public class NodeInformation { - - private final String hostname; - private final Integer siteToSitePort; - private final int apiPort; - private final boolean isSiteToSiteSecure; - private final int totalFlowFiles; - - public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, - final boolean isSiteToSiteSecure, final int totalFlowFiles) { - this.hostname = hostname; - this.siteToSitePort = siteToSitePort; - this.apiPort = apiPort; - this.isSiteToSiteSecure = isSiteToSiteSecure; - this.totalFlowFiles = totalFlowFiles; - } - - public String getHostname() { - return hostname; - } - - public int getAPIPort() { - return apiPort; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public boolean isSiteToSiteSecure() { - return isSiteToSiteSecure; - } - - public int getTotalFlowFiles() { - return totalFlowFiles; - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof NodeInformation)) { - return false; - } - - final NodeInformation other = (NodeInformation) obj; - if (!hostname.equals(other.hostname)) { - return false; - } - if (siteToSitePort == null && other.siteToSitePort != null) { - return false; - } - if (siteToSitePort != null && other.siteToSitePort == null) { - return false; - } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { - return false; - } - if (apiPort != other.apiPort) { - return false; - } - if (isSiteToSiteSecure != other.isSiteToSiteSecure) { - return false; - } - return true; - } - - @Override - public int hashCode() { - return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); - } - - @Override - public String toString() { - return "Node[" + hostname + ":" + apiPort + "]"; - } -}