http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java new file mode 100644 index 0000000..35380dd --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.io.IOException; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.StringUtils; + +/** + * Encapsulates an HTTP request. The toString method returns the + * specification-compliant request. + * + * @author unattributed + */ +public class HttpRequest { + + private String method; + private String uri; + private String rawUri; + private String version; + private String body; + private String rawRequest; + private Map<String, String> headers = new HashMap<>(); + private Map<String, List<String>> parameters = new HashMap<>(); + + public static HttpRequestBuilder createFromRequestLine(final String requestLine) { + return new HttpRequestBuilder(requestLine); + } + + public String getBody() { + return body; + } + + public Map<String, String> getHeaders() { + return Collections.unmodifiableMap(headers); + } + + public String getHeaderValue(final String header) { + for (final Map.Entry<String, String> entry : getHeaders().entrySet()) { + if (entry.getKey().equalsIgnoreCase(header)) { + return entry.getValue(); + } + } + return null; + } + + public String getMethod() { + return method; + } + + public Map<String, List<String>> getParameters() { + final Map<String, List<String>> result = new HashMap<>(); + for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) { + result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + } + return Collections.unmodifiableMap(result); + } + + public String getUri() { + return uri; + } + + public String getRawUri() { + return rawUri; + } + + public String getVersion() { + return version; + } + + @Override + public String toString() { + return rawRequest; + } + + /** + * A builder for constructing basic HTTP requests. It handles only enough of + * the HTTP specification to support basic unit testing, and it should not + * be used otherwise. + */ + public static class HttpRequestBuilder { + + private String method; + private String uri; + private String rawUri; + private String version; + private Map<String, String> headers = new HashMap<>(); + private Map<String, List<String>> parameters = new HashMap<>(); + private int contentLength = 0; + private String contentType; + private String body = ""; + private StringBuilder rawRequest = new StringBuilder(); + + private HttpRequestBuilder(final String requestLine) { + + final String[] tokens = requestLine.split(" "); + if (tokens.length != 3) { + throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine); + } + + method = tokens[0]; + if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) { + final int queryIndex = tokens[1].indexOf("?"); + if (queryIndex > -1) { + uri = tokens[1].substring(0, queryIndex); + addParameters(tokens[1].substring(queryIndex + 1)); + } else { + uri = tokens[1]; + } + } + rawUri = tokens[1]; + version = tokens[2]; + rawRequest.append(requestLine).append("\n"); + } + + private void addHeader(final String key, final String value) { + if (key.contains(" ")) { + throw new IllegalArgumentException("Header key may not contain spaces."); + } else if ("content-length".equalsIgnoreCase(key)) { + contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim()); + } else if ("content-type".equalsIgnoreCase(key)) { + contentType = value.trim(); + } + headers.put(key, value); + } + + public void addHeader(final String header) { + final int firstColonIndex = header.indexOf(":"); + if (firstColonIndex < 0) { + throw new IllegalArgumentException("Invalid HTTP Header line: " + header); + } + addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1)); + rawRequest.append(header).append("\n"); + } + + // final because constructor calls it + public final void addParameters(final String queryString) { + + if (StringUtils.isBlank(queryString)) { + return; + } + + final String normQueryString; + if (queryString.startsWith("?")) { + normQueryString = queryString.substring(1); + } else { + normQueryString = queryString; + } + final String[] keyValuePairs = normQueryString.split("&"); + for (final String keyValuePair : keyValuePairs) { + final String[] keyValueTokens = keyValuePair.split("="); + try { + addParameter( + URLDecoder.decode(keyValueTokens[0], "utf-8"), + URLDecoder.decode(keyValueTokens[1], "utf-8") + ); + } catch (UnsupportedEncodingException use) { + throw new RuntimeException(use); + } + } + } + + public void addParameter(final String key, final String value) { + + if (key.contains(" ")) { + throw new IllegalArgumentException("Parameter key may not contain spaces: " + key); + } + + final List<String> values; + if (parameters.containsKey(key)) { + values = parameters.get(key); + } else { + values = new ArrayList<>(); + parameters.put(key, values); + } + values.add(value); + } + + public void addBody(final Reader reader) throws IOException { + + if (contentLength <= 0) { + return; + } + + final char[] buf = new char[contentLength]; + int offset = 0; + int numRead = 0; + while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) { + offset += numRead; + } + body = new String(buf); + rawRequest.append("\n"); + rawRequest.append(body); + } + + public HttpRequest build() throws UnsupportedEncodingException { + + if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) { + addParameters(body); + } + + final HttpRequest request = new HttpRequest(); + request.method = this.method; + request.uri = this.uri; + request.rawUri = this.rawUri; + request.version = this.version; + request.headers.putAll(this.headers); + request.parameters.putAll(this.parameters); + request.body = this.body; + request.rawRequest = this.rawRequest.toString(); + + return request; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java new file mode 100644 index 0000000..3aa2931 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.ws.rs.core.Response.Status; + +/** + * Encapsulates an HTTP response. The toString method returns the + * specification-compliant response. + * + * @author unattributed + */ +public class HttpResponse { + + private final Status status; + private final String entity; + private final Map<String, String> headers = new HashMap<>(); + + public HttpResponse(final Status status, final String entity) { + this.status = status; + this.entity = entity; + headers.put("content-length", String.valueOf(entity.getBytes().length)); + } + + public String getEntity() { + return entity; + } + + public Status getStatus() { + return status; + } + + public Map<String, String> getHeaders() { + return Collections.unmodifiableMap(headers); + } + + public void addHeader(final String key, final String value) { + if (key.contains(" ")) { + throw new IllegalArgumentException("Header key may not contain spaces."); + } else if ("content-length".equalsIgnoreCase(key)) { + throw new IllegalArgumentException("Content-Length header is set automatically based on length of content."); + } + headers.put(key, value); + } + + public void addHeaders(final Map<String, String> headers) { + for (final Map.Entry<String, String> entry : headers.entrySet()) { + addHeader(entry.getKey(), entry.getValue()); + } + } + + @Override + public String toString() { + + final StringBuilder strb = new StringBuilder(); + + // response line + strb.append("HTTP/1.1 ") + .append(status.getStatusCode()) + .append(" ") + .append(status.getReasonPhrase()) + .append("\n"); + + // headers + for (final Map.Entry<String, String> entry : headers.entrySet()) { + strb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + + strb.append("\n"); + + // body + strb.append(entity); + + return strb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java new file mode 100644 index 0000000..28615d0 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +/** + * Wraps a HttpResponse with a time-delay. When the action is applied, the + * currently executing thread sleeps for the given delay before returning the + * response to the caller. + * + * This class is good for simulating network latency. + * + * @author unattributed + */ +public class HttpResponseAction { + + private final HttpResponse response; + + private final int waitTimeMs; + + public HttpResponseAction(final HttpResponse response) { + this(response, 0); + } + + public HttpResponseAction(final HttpResponse response, final int waitTimeMs) { + this.response = response; + this.waitTimeMs = waitTimeMs; + } + + public HttpResponse apply() { + try { + Thread.sleep(waitTimeMs); + } catch (final InterruptedException ie) { + throw new RuntimeException(ie); + } + + return response; + } + + public HttpResponse getResponse() { + return response; + } + + public int getWaitTimeMs() { + return waitTimeMs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java new file mode 100644 index 0000000..f17a66c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.Reader; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple HTTP web server that allows clients to register canned-responses to + * respond to received requests. + * + * @author unattributed + */ +public class HttpServer { + + private static final Logger logger = LoggerFactory.getLogger(HttpServer.class); + + private final ExecutorService executorService; + private final ServerSocket serverSocket; + private final Queue<HttpResponseAction> responseQueue = new ConcurrentLinkedQueue<>(); + private final Map<String, String> checkedHeaders = new HashMap<>(); + private final Map<String, List<String>> checkedParameters = new HashMap<>(); + private final int port; + + public HttpServer(int numThreads, int port) throws IOException { + this.port = port; + executorService = Executors.newFixedThreadPool(numThreads); + serverSocket = new ServerSocket(port); + } + + public void start() { + + new Thread() { + @Override + public void run() { + while (isRunning()) { + try { + final Socket conn = serverSocket.accept(); + executorService.execute(new Runnable() { + @Override + public void run() { + handleRequest(conn); + if (conn.isClosed() == false) { + try { + conn.close(); + } catch (IOException ioe) { + } + } + } + }); + } catch (final SocketException se) { + /* ignored */ + } catch (final IOException ioe) { + if (logger.isDebugEnabled()) { + logger.warn("", ioe); + } + } + } + } + ; + } + + .start(); + } + + public boolean isRunning() { + return executorService.isShutdown() == false; + } + + public void stop() { + // shutdown server socket + try { + if (serverSocket.isClosed() == false) { + serverSocket.close(); + } + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + + // shutdown executor service + try { + executorService.shutdown(); + executorService.awaitTermination(3, TimeUnit.SECONDS); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + } + + public int getPort() { + if (isRunning()) { + return serverSocket.getLocalPort(); + } else { + return port; + } + } + + public Queue<HttpResponseAction> addResponseAction(final HttpResponseAction response) { + responseQueue.add(response); + return responseQueue; + } + + public void addCheckedHeaders(final Map<String, String> headers) { + checkedHeaders.putAll(headers); + } + + public void addCheckedParameters(final Map<String, List<String>> parameters) { + checkedParameters.putAll(parameters); + } + + private void handleRequest(final Socket conn) { + try { + + final HttpRequest httpRequest = buildRequest(conn.getInputStream()); + + if (logger.isDebugEnabled()) { + logger.debug("\n" + httpRequest); + } + + // check headers + final Map<String, String> reqHeaders = httpRequest.getHeaders(); + for (final Map.Entry<String, String> entry : checkedHeaders.entrySet()) { + if (reqHeaders.containsKey(entry.getKey())) { + if (entry.getValue().equals(reqHeaders.get(entry.getKey()))) { + logger.error("Incorrect HTTP request header value received for checked header: " + entry.getKey()); + conn.close(); + return; + } + } else { + logger.error("Missing checked header: " + entry.getKey()); + conn.close(); + return; + } + } + + // check parameters + final Map<String, List<String>> reqParams = httpRequest.getParameters(); + for (final Map.Entry<String, List<String>> entry : checkedParameters.entrySet()) { + if (reqParams.containsKey(entry.getKey())) { + if (entry.getValue().equals(reqParams.get(entry.getKey())) == false) { + logger.error("Incorrect HTTP request parameter values received for checked parameter: " + entry.getKey()); + conn.close(); + return; + } + } else { + logger.error("Missing checked parameter: " + entry.getKey()); + conn.close(); + return; + } + } + + // apply the next response + final HttpResponseAction response = responseQueue.remove(); + response.apply(); + + // send the response to client + final PrintWriter pw = new PrintWriter(conn.getOutputStream(), true); + + if (logger.isDebugEnabled()) { + logger.debug("\n" + response.getResponse()); + } + + pw.print(response.getResponse()); + pw.flush(); + + } catch (IOException ioe) { /* ignored */ } + } + + private HttpRequest buildRequest(final InputStream requestIs) throws IOException { + return new HttpRequestReader().read(new InputStreamReader(requestIs)); + } + + // reads an HTTP request from the given reader + private class HttpRequestReader { + + public HttpRequest read(final Reader reader) throws IOException { + + HttpRequestBuilder builder = null; + String line = ""; + boolean isRequestLine = true; + while ((line = readLine(reader)).isEmpty() == false) { + if (isRequestLine) { + builder = HttpRequest.createFromRequestLine(line); + isRequestLine = false; + } else { + builder.addHeader(line); + } + } + + if (builder != null) { + builder.addBody(reader); + } + + return builder.build(); + } + + private String readLine(final Reader reader) throws IOException { + + /* read character at time to prevent blocking */ + final StringBuilder strb = new StringBuilder(); + char c; + while ((c = (char) reader.read()) != '\n') { + if (c != '\r') { + strb.append(c); + } + } + return strb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java new file mode 100644 index 0000000..96943c2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetAddress; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +public class ClusterManagerProtocolSenderImplTest { + + private InetAddress address; + + private int port; + + private SocketProtocolListener listener; + + private ClusterManagerProtocolSenderImpl sender; + + private ProtocolHandler mockHandler; + + @Before + public void setup() throws IOException { + + address = InetAddress.getLocalHost(); + ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); + + mockHandler = mock(ProtocolHandler.class); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.addHandler(mockHandler); + listener.start(); + + port = listener.getPort(); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if (listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testRequestFlow() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + FlowResponseMessage response = sender.requestFlow(request); + assertNotNull(response); + } + + @Test + public void testRequestFlowWithBadResponseMessage() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch (ProtocolException pe) { + } + + } + + @Test + public void testRequestFlowDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { + @Override + public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new FlowResponseMessage(); + } + }); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch (ProtocolException pe) { + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java new file mode 100644 index 0000000..4a69571 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.*; +import org.mockito.stubbing.OngoingStubbing; + +public class ClusterServiceLocatorTest { + + private ClusterServiceDiscovery mockServiceDiscovery; + + private int fixedPort; + + private DiscoverableService fixedService; + + private ClusterServiceLocator serviceDiscoveryLocator; + + private ClusterServiceLocator serviceDiscoveryFixedPortLocator; + + private ClusterServiceLocator fixedServiceLocator; + + @Before + public void setup() throws Exception { + + fixedPort = 1; + mockServiceDiscovery = mock(ClusterServiceDiscovery.class); + fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20)); + + serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery); + serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort); + fixedServiceLocator = new ClusterServiceLocator(fixedService); + + } + + @Test + public void getServiceWhenServiceDiscoveryNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenServiceDiscoveryFixedPortNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenFixedServiceNotStarted() { + assertEquals(fixedService, fixedServiceLocator.getService()); + } + + @Test + public void getServiceNotOnFirstAttempt() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for (int i = 0; i < config.getNumAttempts() - 1; i++) { + if (stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + assertEquals(fixedService, serviceDiscoveryLocator.getService()); + + } + + @Test + public void getServiceNotOnFirstAttemptWithFixedPort() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryFixedPortLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for (int i = 0; i < config.getNumAttempts() - 1; i++) { + if (stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort); + DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress); + assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java new file mode 100644 index 0000000..4d85d1a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.net.InetSocketAddress; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class ClusterServicesBroadcasterTest { + + private ClusterServicesBroadcaster broadcaster; + + private MulticastProtocolListener listener; + + private DummyProtocolHandler handler; + + private InetSocketAddress multicastAddress; + + private DiscoverableService broadcastedService; + + private ProtocolContext protocolContext; + + private MulticastConfiguration configuration; + + @Before + public void setup() throws Exception { + + broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111)); + + multicastAddress = new InetSocketAddress("225.1.1.1", 22222); + + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms"); + broadcaster.addService(broadcastedService); + + handler = new DummyProtocolHandler(); + listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext); + listener.addHandler(handler); + } + + @After + public void teardown() { + + if (broadcaster.isRunning()) { + broadcaster.stop(); + } + + try { + if (listener.isRunning()) { + listener.stop(); + } + } catch (Exception ex) { + ex.printStackTrace(System.out); + } + + } + + @Test + @Ignore + public void testBroadcastReceived() throws Exception { + + broadcaster.start(); + listener.start(); + + Thread.sleep(1000); + + listener.stop(); + + assertNotNull(handler.getProtocolMessage()); + assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType()); + final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage(); + assertEquals(broadcastedService.getServiceName(), msg.getServiceName()); + assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress()); + assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort()); + } + + private class DummyProtocolHandler implements ProtocolHandler { + + private ProtocolMessage protocolMessage; + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + this.protocolMessage = msg; + return null; + } + + public ProtocolMessage getProtocolMessage() { + return protocolMessage; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java new file mode 100644 index 0000000..6c79b90 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class MulticastProtocolListenerTest { + + private MulticastProtocolListener listener; + + private MulticastSocket socket; + + private InetSocketAddress address; + + private MulticastConfiguration configuration; + + private ProtocolContext protocolContext; + + @Before + public void setup() throws Exception { + + address = new InetSocketAddress("226.1.1.1", 60000); + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new MulticastProtocolListener(5, address, configuration, protocolContext); + listener.start(); + + socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration); + } + + @After + public void teardown() throws IOException { + try { + if (listener.isRunning()) { + listener.stop(); + } + } finally { + MulticastUtils.closeQuietly(socket); + } + } + + @Ignore("This test must be reworked. Requires an active network connection") + @Test + public void testBadRequest() throws Exception { + DelayedProtocolHandler handler = new DelayedProtocolHandler(0); + listener.addHandler(handler); + DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address); + socket.send(packet); + Thread.sleep(250); + assertEquals(0, handler.getMessages().size()); + } + + @Test + @Ignore + public void testRequest() throws Exception { + + ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); + listener.addHandler(handler); + + ProtocolMessage msg = new PingMessage(); + MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg); + + // marshal message to output stream + ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(multicastMsg, baos); + byte[] requestPacketBytes = baos.toByteArray(); + DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address); + socket.send(packet); + + Thread.sleep(250); + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + + } + + private class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } + + private class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch (final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java new file mode 100644 index 0000000..7c62d2f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.UUID; + +import org.apache.nifi.cluster.HeartbeatPayload; +import org.apache.nifi.cluster.protocol.ConnectionRequest; +import org.apache.nifi.cluster.protocol.ConnectionResponse; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +public class NodeProtocolSenderImplTest { + + private SocketProtocolListener listener; + + private NodeProtocolSenderImpl sender; + + private DiscoverableService service; + + private ServerSocketConfiguration serverSocketConfiguration; + + private ClusterServiceLocator mockServiceLocator; + + private ProtocolHandler mockHandler; + + private NodeIdentifier nodeIdentifier; + + @Before + public void setup() throws IOException { + + serverSocketConfiguration = new ServerSocketConfiguration(); + + mockServiceLocator = mock(ClusterServiceLocator.class); + mockHandler = mock(ProtocolHandler.class); + + nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.setShutdownListenerSeconds(3); + listener.addHandler(mockHandler); + listener.start(); + + service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort())); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + socketConfiguration.setReuseAddress(true); + sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if (listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testConnect() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); + mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + ConnectionResponseMessage response = sender.requestConnection(request); + assertNotNull(response); + } + + @Test(expected = UnknownServiceAddressException.class) + public void testConnectNoClusterManagerAddress() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(null); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + } + + @Test(expected = ProtocolException.class) + public void testConnectBadResponse() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test(expected = ProtocolException.class) + public void testConnectDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() { + @Override + public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new ConnectionResponseMessage(); + } + }); + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test + public void testHeartbeat() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + HeartbeatMessage msg = new HeartbeatMessage(); + HeartbeatPayload hbPayload = new HeartbeatPayload(); + Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, hbPayload.marshal()); + msg.setHeartbeat(hb); + sender.heartbeat(msg); + } + + @Test + public void testNotifyControllerStartupFailure() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); + msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); + msg.setExceptionMessage("some exception"); + sender.notifyControllerStartupFailure(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java new file mode 100644 index 0000000..92a7d2a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler; +import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.SocketUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; + +/** + * @author unattributed + */ +public class SocketProtocolListenerTest { + + private SocketProtocolListener listener; + + private Socket socket; + + private ProtocolMessageMarshaller<ProtocolMessage> marshaller; + + private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller; + + @Before + public void setup() throws Exception { + + final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + marshaller = protocolContext.createMarshaller(); + unmarshaller = protocolContext.createUnmarshaller(); + + ServerSocketConfiguration configuration = new ServerSocketConfiguration(); + configuration.setSocketTimeout(1000); + + listener = new SocketProtocolListener(5, 0, configuration, protocolContext); + listener.start(); + + int port = listener.getPort(); + + SocketConfiguration config = new SocketConfiguration(); + config.setReuseAddress(true); + config.setSocketTimeout(1000); + socket = SocketUtils.createSocket(new InetSocketAddress("localhost", port), config); + } + + @After + public void teardown() throws IOException { + try { + if (listener.isRunning()) { + listener.stop(); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Test + public void testBadRequest() throws Exception { + DelayedProtocolHandler handler = new DelayedProtocolHandler(0); + listener.addHandler(handler); + socket.getOutputStream().write(5); + Thread.sleep(250); + assertEquals(0, handler.getMessages().size()); + } + + @Test + public void testRequest() throws Exception { + ProtocolMessage msg = new PingMessage(); + + ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); + listener.addHandler(handler); + + // marshal message to output stream + marshaller.marshal(msg, socket.getOutputStream()); + + // unmarshall response and return + ProtocolMessage response = unmarshaller.unmarshal(socket.getInputStream()); + assertEquals(msg.getType(), response.getType()); + + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + } + + @Test + public void testDelayedRequest() throws Exception { + ProtocolMessage msg = new PingMessage(); + + DelayedProtocolHandler handler = new DelayedProtocolHandler(2000); + listener.addHandler(handler); + + // marshal message to output stream + marshaller.marshal(msg, socket.getOutputStream()); + + try { + socket.getInputStream().read(); + fail("Socket timeout not received."); + } catch (SocketTimeoutException ste) { + } + + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java new file mode 100644 index 0000000..2f16777 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch (final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java new file mode 100644 index 0000000..e80f52c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml new file mode 100644 index 0000000..92eb78c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="INFO"/> + + <!-- Logger for managing logging statements for nifi clusters. --> + <logger name="org.apache.nifi.cluster" level="INFO"/> + + <!-- + Logger for logging HTTP requests received by the web server. Setting + log level to 'debug' activates HTTP request logging. + --> + <logger name="org.apache.nifi.server.JettyServer" level="INFO"/> + + <!-- Logger for managing logging statements for jetty --> + <logger name="org.mortbay" level="INFO"/> + + <!-- Suppress non-error messages due to excessive logging by class --> + <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/> + + <logger name="org.apache.nifi.processors.standard" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt new file mode 100755 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt new file mode 100755 index 0000000..e8e4c2b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt @@ -0,0 +1,12 @@ + +bad data should be skipped + +# this is a comment + 2.2.2.2 # this is another comment #### +3.3.3.3/8 + +4.4.4.4/24 + +5.5.5.255/31 + +more bad data \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index bb53aff..f2bf6c9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -64,8 +64,8 @@ <artifactId>nifi-client-dto</artifactId> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site-client</artifactId> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> @@ -121,4 +121,19 @@ <scope>test</scope> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/conf/0bytes.xml</exclude> + <exclude>src/test/resources/conf/termination-only.xml</exclude> + <exclude>src/test/resources/hello.txt</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml index 60eda8a..6b587fd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml @@ -588,6 +588,19 @@ </packagingIncludes> </configuration> </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/webapp/js/json2.js</exclude> + <exclude>src/main/webapp/js/jquery/</exclude> + <exclude>src/main/webapp/js/d3/d3.min.js</exclude> + <exclude>src/main/webapp/js/codemirror/</exclude> + <exclude>src/main/webapp/css/reset.css</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> </profile> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml index 544dd77..2aaa045 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml @@ -30,9 +30,9 @@ <module>nifi-security</module> <module>nifi-site-to-site</module> <module>nifi-framework-core</module> - <module>nifi-cluster-protocol</module> - <module>nifi-cluster-web</module> - <module>nifi-cluster</module> + <module>nifi-framework-cluster-protocol</module> + <module>nifi-framework-cluster-web</module> + <module>nifi-framework-cluster</module> <module>nifi-file-authorization-provider</module> <module>nifi-cluster-authorization-provider</module> <module>nifi-user-actions</module> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2043a0d..170a95e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -48,7 +48,7 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-distributed-cache-client-service-api</artifactId> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-http-context-map-api</artifactId> </dependency> @@ -169,4 +169,88 @@ <version>1.7</version> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/hello.txt</exclude> + <exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude> + <exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt</exclude> + <exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude> + <exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude> + <exclude>src/test/resources/ExecuteCommand/test.txt</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary1</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude> + <exclude>src/test/resources/TestJson/json-sample.json</exclude> + <exclude>src/test/resources/TestMergeContent/demarcate</exclude> + <exclude>src/test/resources/TestMergeContent/foot</exclude> + <exclude>src/test/resources/TestMergeContent/head</exclude> + <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/testFile.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/$1$1.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Good.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Spider.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/[DODO].txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/cu_Po.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/food.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/testFile.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/colors.txt</exclude> + <exclude>src/test/resources/TestScanContent/helloWorld</exclude> + <exclude>src/test/resources/TestScanContent/wellthengood-bye</exclude> + <exclude>src/test/resources/TestSplitText/1.txt</exclude> + <exclude>src/test/resources/TestSplitText/2.txt</exclude> + <exclude>src/test/resources/TestSplitText/3.txt</exclude> + <exclude>src/test/resources/TestSplitText/4.txt</exclude> + <exclude>src/test/resources/TestSplitText/5.txt</exclude> + <exclude>src/test/resources/TestSplitText/6.txt</exclude> + <exclude>src/test/resources/TestSplitText/original.txt</exclude> + <exclude>src/test/resources/TestTransformXml/math.html</exclude> + <exclude>src/test/resources/TestTransformXml/tokens.csv</exclude> + <exclude>src/test/resources/TestTransformXml/tokens.xml</exclude> + <exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude> + <exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude> + <exclude>src/test/resources/TestXml/xml-bundle-1</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude> + <exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude> + <exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude> + <exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude> + <exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude> + <exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt.bz2</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt.gz</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.zip</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude> + <exclude>src/test/resources/TestUnpackContent/data.tar</exclude> + <exclude>src/test/resources/TestUnpackContent/data.zip</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> </project>