Repository: nifi Updated Branches: refs/heads/master 4ce7b679e -> 6f5fb5947
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java new file mode 100644 index 0000000..c79d593 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"listen", "tcp", "tls", "ssl"}) +@CapabilityDescription("Listens for incoming TCP connections and reads data from each connection using a line separator " + + "as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " + + "be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " + + "set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " + + "the Receive Buffer Size must be greater than 100kb.") +@WritesAttributes({ + @WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."), + @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.") +}) +public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent> { + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be received over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("Client Auth") + .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue(SSLContextService.ClientAuth.REQUIRED.name()) + .build(); + + // it is only the array reference that is volatile - not the contents. + private volatile byte[] messageDemarcatorBytes; + + @Override + protected List<PropertyDescriptor> getAdditionalProperties() { + return Arrays.asList( + MAX_CONNECTIONS, + MAX_BATCH_SIZE, + MESSAGE_DELIMITER, + SSL_CONTEXT_SERVICE, + CLIENT_AUTH + ); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + + final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); + final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (sslContextService != null && StringUtils.isBlank(clientAuth)) { + results.add(new ValidationResult.Builder() + .explanation("Client Auth must be provided when using TLS/SSL") + .valid(false).subject("Client Auth").build()); + } + + return results; + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + messageDemarcatorBytes = msgDemarcator.getBytes(charset); + } + + @Override + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<TCPEvent> events) + throws IOException { + + final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); + final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); + + // initialize the buffer pool based on max number of connections and the buffer size + final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections); + for (int i = 0; i < maxConnections; i++) { + bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + + final EventFactory<TCPEvent> eventFactory = new TCPEventFactory(); + + // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher + SSLContext sslContext = null; + SslContextFactory.ClientAuth clientAuth = null; + + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue)); + clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue); + } + + final ChannelHandlerFactory<TCPEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); + return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); + + // if the size is 0 then there was nothing to process so return + // we don't need to yield here because we have a long poll in side of getBatches + if (batches.size() == 0) { + return; + } + + for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) { + FlowFile flowFile = entry.getValue().getFlowFile(); + final List<TCPEvent> events = entry.getValue().getEvents(); + + if (flowFile.getSize() == 0L || events.size() == 0) { + session.remove(flowFile); + getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); + continue; + } + + // the sender and command will be the same for all events based on the batch key + final String sender = events.get(0).getSender(); + + final Map<String,String> attributes = new HashMap<>(3); + attributes.put("tcp.sender", sender); + attributes.put("tcp.port", String.valueOf(port)); + flowFile = session.putAllAttributes(flowFile, attributes); + + getLogger().debug("Transferring {} to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("FlowFiles Transferred to Success", 1L, false); + + // create a provenance receive event + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":") + .append(port).toString(); + session.getProvenanceReporter().receive(flowFile, transitUri); + } + } + + /** + * Event implementation for TCP. + */ + static class TCPEvent<C extends SelectableChannel> extends StandardEvent<C> { + + public TCPEvent(String sender, byte[] data, ChannelResponder<C> responder) { + super(sender, data, responder); + } + } + + /** + * Factory implementation for TCPEvents. + */ + static final class TCPEventFactory implements EventFactory<TCPEvent> { + + @Override + public TCPEvent create(byte[] data, Map<String, String> metadata, ChannelResponder responder) { + String sender = null; + if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { + sender = metadata.get(EventFactory.SENDER_KEY); + } + return new TCPEvent(sender, data, responder); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index e555a0c..373e402 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -35,19 +34,17 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender; +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender; +import org.apache.nifi.processor.util.put.sender.SocketChannelSender; import org.apache.nifi.processors.standard.syslog.SyslogParser; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; import javax.net.ssl.SSLContext; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; @@ -81,12 +78,13 @@ public class PutSyslog extends AbstractSyslogProcessor { .defaultValue("localhost") .required(true) .build(); - public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Send Buffer Size") - .description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " + - "Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.") + public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Send Buffer") + .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("2048 B") + .defaultValue("1 MB") .required(true) .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor @@ -164,8 +162,6 @@ public class PutSyslog extends AbstractSyslogProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> descriptors; - - private volatile BlockingQueue<ByteBuffer> bufferPool; private volatile BlockingQueue<ChannelSender> senderPool; @Override @@ -174,9 +170,10 @@ public class PutSyslog extends AbstractSyslogProcessor { descriptors.add(HOSTNAME); descriptors.add(PROTOCOL); descriptors.add(PORT); + descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(IDLE_EXPIRATION); - descriptors.add(SEND_BUFFER_SIZE); + descriptors.add(TIMEOUT); descriptors.add(BATCH_SIZE); descriptors.add(CHARSET); descriptors.add(MSG_PRIORITY); @@ -221,40 +218,40 @@ public class PutSyslog extends AbstractSyslogProcessor { @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { - final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); - for (int i=0; i < context.getMaxConcurrentTasks(); i++) { - this.bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } - // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); } - protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException { + protected ChannelSender createSender(final ProcessContext context) throws IOException { final int port = context.getProperty(PORT).asInteger(); final String host = context.getProperty(HOSTNAME).getValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); - final String charSet = context.getProperty(CHARSET).getValue(); + final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool); + return createSender(sslContextService, protocol, host, port, maxSendBuffer, timeout); } // visible for testing to override and provide a mock sender if desired - protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port, - final Charset charset, final BlockingQueue<ByteBuffer> bufferPool) + protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, + final int port, final int maxSendBufferSize, final int timeout) throws IOException { + + ChannelSender sender; if (protocol.equals(UDP_VALUE.getValue())) { - return new DatagramChannelSender(host, port, bufferPool, charset); + sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger()); } else { // if an SSLContextService is provided then we make a secure sender if (sslContextService != null) { final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); - return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset); + sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger()); } else { - return new SocketChannelSender(host, port, bufferPool, charset); + sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger()); } } + sender.setTimeout(timeout); + sender.open(); + return sender; } @OnStopped @@ -275,7 +272,7 @@ public class PutSyslog extends AbstractSyslogProcessor { // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { - if (currentTime > (sender.lastUsed + idleThreshold)) { + if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); } else { @@ -309,7 +306,7 @@ public class PutSyslog extends AbstractSyslogProcessor { if (sender == null) { try { getLogger().debug("No available connections, creating a new one..."); - sender = createSender(context, bufferPool); + sender = createSender(context); } catch (IOException e) { for (final FlowFile flowFile : flowFiles) { getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", @@ -325,6 +322,7 @@ public class PutSyslog extends AbstractSyslogProcessor { final String host = context.getProperty(HOSTNAME).getValue(); final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null); + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); try { for (FlowFile flowFile : flowFiles) { @@ -352,7 +350,7 @@ public class PutSyslog extends AbstractSyslogProcessor { messageBuilder.append('\n'); } - sender.send(messageBuilder.toString()); + sender.send(messageBuilder.toString(), charSet); timer.stop(); final long duration = timer.getDuration(TimeUnit.MILLISECONDS); @@ -396,157 +394,4 @@ public class PutSyslog extends AbstractSyslogProcessor { return false; } - /** - * Base class for sending messages over a channel. - */ - protected static abstract class ChannelSender { - - final int port; - final String host; - final BlockingQueue<ByteBuffer> bufferPool; - final Charset charset; - volatile long lastUsed; - - ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException { - this.port = port; - this.host = host; - this.bufferPool = bufferPool; - this.charset = charset; - } - - public void send(final String message) throws IOException { - final byte[] bytes = message.getBytes(charset); - - boolean shouldReturn = true; - ByteBuffer buffer = bufferPool.poll(); - if (buffer == null) { - buffer = ByteBuffer.allocate(bytes.length); - shouldReturn = false; - } else if (buffer.limit() < bytes.length) { - // we need a large buffer so return the one we got and create a new bigger one - bufferPool.offer(buffer); - buffer = ByteBuffer.allocate(bytes.length); - shouldReturn = false; - } - - try { - buffer.clear(); - buffer.put(bytes); - buffer.flip(); - write(buffer); - lastUsed = System.currentTimeMillis(); - } finally { - if (shouldReturn) { - bufferPool.offer(buffer); - } - } - } - - // write the given buffer to the underlying channel - abstract void write(ByteBuffer buffer) throws IOException; - - // returns true if the underlying channel is connected - abstract boolean isConnected(); - - // close the underlying channel - abstract void close(); - } - - /** - * Sends messages over a DatagramChannel. - */ - private static class DatagramChannelSender extends ChannelSender { - - final DatagramChannel channel; - - DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException { - super(host, port, bufferPool, charset); - this.channel = DatagramChannel.open(); - this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); - } - - @Override - public void write(ByteBuffer buffer) throws IOException { - while (buffer.hasRemaining()) { - channel.write(buffer); - } - } - - @Override - boolean isConnected() { - return channel != null && channel.isConnected(); - } - - @Override - public void close() { - IOUtils.closeQuietly(channel); - } - } - - /** - * Sends messages over a SocketChannel. - */ - private static class SocketChannelSender extends ChannelSender { - - final SocketChannel channel; - - SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException { - super(host, port, bufferPool, charset); - this.channel = SocketChannel.open(); - this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); - } - - @Override - public void write(ByteBuffer buffer) throws IOException { - while (buffer.hasRemaining()) { - channel.write(buffer); - } - } - - @Override - boolean isConnected() { - return channel != null && channel.isConnected(); - } - - @Override - public void close() { - IOUtils.closeQuietly(channel); - } - } - - /** - * Sends messages over an SSLSocketChannel. - */ - private static class SSLSocketChannelSender extends ChannelSender { - - final SSLSocketChannel channel; - - SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException { - super(host, port, bufferPool, charset); - this.channel = new SSLSocketChannel(sslContext, host, port, true); - this.channel.connect(); - } - - @Override - public void send(final String message) throws IOException { - final byte[] bytes = message.getBytes(charset); - channel.write(bytes); - lastUsed = System.currentTimeMillis(); - } - - @Override - public void write(ByteBuffer buffer) throws IOException { - // nothing to do here since we are overriding send() above - } - - @Override - boolean isConnected() { - return channel != null && !channel.isClosed(); - } - - @Override - public void close() { - IOUtils.closeQuietly(channel); - } - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 5781fad..d3d765e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -48,6 +48,7 @@ org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenRELP org.apache.nifi.processors.standard.ListenSyslog +org.apache.nifi.processors.standard.ListenTCP org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP org.apache.nifi.processors.standard.LogAttribute http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java new file mode 100644 index 0000000..29fa690 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding + * to ListenSyslog, or PutSyslog sending to a syslog server. + */ +public class TestListenAndPutSyslog { + + static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class); + + private ListenSyslog listenSyslog; + private TestRunner listenSyslogRunner; + + private PutSyslog putSyslog; + private TestRunner putSyslogRunner; + + @Before + public void setup() { + this.listenSyslog = new ListenSyslog(); + this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog); + + this.putSyslog = new PutSyslog(); + this.putSyslogRunner = TestRunners.newTestRunner(putSyslog); + } + + @After + public void teardown() { + try { + putSyslog.onStopped(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + try { + listenSyslog.onUnscheduled(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + @Test + public void testUDP() throws IOException, InterruptedException { + run(ListenSyslog.UDP_VALUE.getValue(), 5, 5); + } + + @Test + public void testTCP() throws IOException, InterruptedException { + run(ListenSyslog.TCP_VALUE.getValue(), 5, 5); + } + + @Test + public void testTLS() throws InitializationException, IOException, InterruptedException { + configureSSLContextService(listenSyslogRunner); + listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + configureSSLContextService(putSyslogRunner); + putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + run(ListenSyslog.TCP_VALUE.getValue(), 7, 7); + } + + @Test + public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException { + configureSSLContextService(listenSyslogRunner); + listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + // send 7 but expect 0 because sender didn't use TLS + run(ListenSyslog.TCP_VALUE.getValue(), 7, 0); + } + + private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException { + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + return sslContextService; + } + + /** + * Sends numMessages from PutSyslog to ListenSyslog. + */ + private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException { + // set the same protocol on both processors + putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol); + listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol); + + // set a listening port of 0 to get a random available port + listenSyslogRunner.setProperty(ListenSyslog.PORT, "0"); + + // call onScheduled to start ListenSyslog listening + final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory(); + final ProcessContext context = listenSyslogRunner.getProcessContext(); + listenSyslog.onScheduled(context); + + // get the real port it is listening on and set that in PutSyslog + final int listeningPort = listenSyslog.getPort(); + putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort)); + + // configure the message properties on PutSyslog + final String pri = "34"; + final String version = "1"; + final String stamp = "2016-02-05T22:14:15.003Z"; + final String host = "localhost"; + final String body = "some message"; + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri); + putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version); + putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host); + putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body); + + // send the messages + for (int i=0; i < numMessages; i++) { + putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + } + putSyslogRunner.run(numMessages, false); + + // trigger ListenSyslog until we've seen all the messages + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + listenSyslog.onTrigger(context, processSessionFactory); + numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered); + + if (expectedMessages > 0) { + // check that one of flow files has the expected content + MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(expectedMessage); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java index 877a55a..b885e49 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java @@ -194,11 +194,24 @@ public class TestListenRELP { // send the frames to the port the processors is listening on sendFrames(frames, socket); - // call onTrigger until we processed all the frames, or a certain amount of time passes - long responseTimeout = 10000; - long startTime = System.currentTimeMillis(); - while (proc.responses.size() < expectedTransferred - && (System.currentTimeMillis() - startTime < responseTimeout)) { + long responseTimeout = 30000; + + // this first loop waits until the internal queue of the processor has the expected + // number of messages ready before proceeding, we want to guarantee they are all there + // before onTrigger gets a chance to run + long startTimeQueueSizeCheck = System.currentTimeMillis(); + while (proc.getQueueSize() < expectedResponses + && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) { + Thread.sleep(100); + } + + // want to fail here if the queue size isn't what we expect + Assert.assertEquals(expectedResponses, proc.getQueueSize()); + + // call onTrigger until we got a respond for all the frames, or a certain amount of time passes + long startTimeProcessing = System.currentTimeMillis(); + while (proc.responses.size() < expectedResponses + && (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) { proc.onTrigger(context, processSessionFactory); Thread.sleep(100); } @@ -221,7 +234,6 @@ public class TestListenRELP { for (final RELPFrame frame : frames) { byte[] encodedFrame = encoder.encode(frame); socket.getOutputStream().write(encodedFrame); - Thread.sleep(1); } socket.getOutputStream().flush(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index cd8621c..2743caf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -62,7 +62,8 @@ public class TestListenSyslog { static final String HOST = "localhost.home"; static final String BODY = "some message"; - static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ; + static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; static final String INVALID_MESSAGE = "this is not valid\n"; @Test @@ -135,7 +136,7 @@ public class TestListenSyslog { Assert.assertTrue(port > 0); // write some TCP messages to the port in the background - final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE)); + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); sender.setDaemon(true); sender.start(); @@ -185,7 +186,7 @@ public class TestListenSyslog { Assert.assertTrue(port > 0); // send 3 messages as 1 - final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE; + final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n"; final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); sender.setDaemon(true); sender.start(); @@ -237,7 +238,7 @@ public class TestListenSyslog { Assert.assertTrue(port > 0); // write some TCP messages to the port in the background - final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE)); + final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); sender.setDaemon(true); sender.start(); @@ -292,7 +293,7 @@ public class TestListenSyslog { Assert.assertTrue(port > 0); // write some UDP messages to the port in the background - final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE.replaceAll("\\n", ""))); + final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); sender.setDaemon(true); sender.start(); sender.join(); @@ -432,7 +433,7 @@ public class TestListenSyslog { private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { - flowFile.assertContentEquals(VALID_MESSAGE); + flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", "")); Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java new file mode 100644 index 0000000..ef05eab --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.List; + +public class TestListenTCP { + + private ListenTCP proc; + private TestRunner runner; + + @Before + public void setup() { + proc = new ListenTCP(); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenTCP.PORT, "0"); + } + + @Test + public void testCustomValidate() throws InitializationException { + runner.setProperty(ListenTCP.PORT, "1"); + runner.assertValid(); + + configureProcessorSslContextService(); + runner.setProperty(ListenTCP.CLIENT_AUTH, ""); + runner.assertNotValid(); + + runner.setProperty(ListenTCP.CLIENT_AUTH, SslContextFactory.ClientAuth.REQUIRED.name()); + runner.assertValid(); + } + + @Test + public void testListenTCP() throws IOException, InterruptedException { + final List<String> messages = new ArrayList<>(); + messages.add("This is message 1\n"); + messages.add("This is message 2\n"); + messages.add("This is message 3\n"); + messages.add("This is message 4\n"); + messages.add("This is message 5\n"); + + runTCP(messages, messages.size(), null); + + List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); + for (int i=0; i < mockFlowFiles.size(); i++) { + mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1)); + } + } + + @Test + public void testListenTCPBatching() throws IOException, InterruptedException { + runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3"); + + final List<String> messages = new ArrayList<>(); + messages.add("This is message 1\n"); + messages.add("This is message 2\n"); + messages.add("This is message 3\n"); + messages.add("This is message 4\n"); + messages.add("This is message 5\n"); + + runTCP(messages, 2, null); + + List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); + + MockFlowFile mockFlowFile1 = mockFlowFiles.get(0); + mockFlowFile1.assertContentEquals("This is message 1\nThis is message 2\nThis is message 3"); + + MockFlowFile mockFlowFile2 = mockFlowFiles.get(1); + mockFlowFile2.assertContentEquals("This is message 4\nThis is message 5"); + } + + @Test + public void testTLSClienAuthRequiredAndClientCertProvided() throws InitializationException, IOException, InterruptedException, + UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + + runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name()); + configureProcessorSslContextService(); + + final List<String> messages = new ArrayList<>(); + messages.add("This is message 1\n"); + messages.add("This is message 2\n"); + messages.add("This is message 3\n"); + messages.add("This is message 4\n"); + messages.add("This is message 5\n"); + + // Make an SSLContext with a key and trust store to send the test messages + final SSLContext clientSslContext = SslContextFactory.createSslContext( + "src/test/resources/localhost-ks.jks", + "localtest".toCharArray(), + "jks", + "src/test/resources/localhost-ts.jks", + "localtest".toCharArray(), + "jks", + org.apache.nifi.security.util.SslContextFactory.ClientAuth.valueOf("NONE"), + "TLS"); + + runTCP(messages, messages.size(), clientSslContext); + + List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); + for (int i=0; i < mockFlowFiles.size(); i++) { + mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1)); + } + } + + @Test + public void testTLSClienAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException, + UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + + runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name()); + configureProcessorSslContextService(); + + final List<String> messages = new ArrayList<>(); + messages.add("This is message 1\n"); + messages.add("This is message 2\n"); + messages.add("This is message 3\n"); + messages.add("This is message 4\n"); + messages.add("This is message 5\n"); + + // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED + final SSLContext clientSslContext = SslContextFactory.createTrustSslContext( + "src/test/resources/localhost-ts.jks", + "localtest".toCharArray(), + "jks", + "TLS"); + + try { + runTCP(messages, messages.size(), clientSslContext); + Assert.fail("Should have thrown exception"); + } catch (Exception e) { + + } + } + + @Test + public void testTLSClienAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException, + UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + + runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); + configureProcessorSslContextService(); + + final List<String> messages = new ArrayList<>(); + messages.add("This is message 1\n"); + messages.add("This is message 2\n"); + messages.add("This is message 3\n"); + messages.add("This is message 4\n"); + messages.add("This is message 5\n"); + + // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED + final SSLContext clientSslContext = SslContextFactory.createTrustSslContext( + "src/test/resources/localhost-ts.jks", + "localtest".toCharArray(), + "jks", + "TLS"); + + runTCP(messages, messages.size(), clientSslContext); + + List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); + for (int i=0; i < mockFlowFiles.size(); i++) { + mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1)); + } + } + + protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext) + throws IOException, InterruptedException { + + Socket socket = null; + try { + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + // create a client connection to the port the dispatcher is listening on + final int realPort = proc.getDispatcherPort(); + + // create either a regular socket or ssl socket based on context being passed in + if (sslContext != null) { + socket = sslContext.getSocketFactory().createSocket("localhost", realPort); + } else { + socket = new Socket("localhost", realPort); + } + Thread.sleep(100); + + // send the frames to the port the processors is listening on + for (final String message : messages) { + socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8)); + Thread.sleep(1); + } + socket.getOutputStream().flush(); + + long responseTimeout = 10000; + + // this first loop waits until the internal queue of the processor has the expected + // number of messages ready before proceeding, we want to guarantee they are all there + // before onTrigger gets a chance to run + long startTimeQueueSizeCheck = System.currentTimeMillis(); + while (proc.getQueueSize() < messages.size() + && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) { + Thread.sleep(100); + } + + // want to fail here if the queue size isn't what we expect + Assert.assertEquals(messages.size(), proc.getQueueSize()); + + // call onTrigger until we processed all the frames, or a certain amount of time passes + int numTransferred = 0; + long startTime = System.currentTimeMillis(); + while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size(); + Thread.sleep(100); + } + + // should have transferred the expected events + runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred); + } finally { + // unschedule to close connections + proc.onUnscheduled(); + IOUtils.closeQuietly(socket); + } + } + + private SSLContextService configureProcessorSslContextService() throws InitializationException { + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context"); + return sslContextService; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java index c96d105..60fe37d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.ssl.SSLContextService; @@ -27,14 +28,11 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; public class TestPutSyslog { @@ -327,8 +325,9 @@ public class TestPutSyslog { } @Override - protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port, - Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException { + protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, + int port, int maxSendBuffer, int timeout) + throws IOException { return mockSender; } } @@ -346,8 +345,9 @@ public class TestPutSyslog { } @Override - protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port, - Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException { + protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, + int port, int maxSendBuffer, int timeout) + throws IOException { if (numSendersCreated >= numSendersAllowed) { throw new IOException("too many senders"); } @@ -357,61 +357,70 @@ public class TestPutSyslog { } // Mock sender that saves any messages passed to send() - static class MockCollectingSender extends PutSyslog.ChannelSender { + static class MockCollectingSender extends ChannelSender { List<String> messages = new ArrayList<>(); public MockCollectingSender() throws IOException { - super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8")); - this.bufferPool.offer(ByteBuffer.allocate(1024)); + super("myhost", 0, 0, null); } @Override - public void send(String message) throws IOException { + public void open() throws IOException { + + } + + @Override + public void send(String message, Charset charset) throws IOException { messages.add(message); - super.send(message); + super.send(message, charset); } @Override - void write(ByteBuffer buffer) throws IOException { + protected void write(byte[] buffer) throws IOException { } @Override - boolean isConnected() { + public boolean isConnected() { return true; } @Override - void close() { + public void close() { } } // Mock sender that throws IOException on calls to write() or send() - static class MockErrorSender extends PutSyslog.ChannelSender { + static class MockErrorSender extends ChannelSender { public MockErrorSender() throws IOException { - super(null, 0, null, null); + super(null, 0, 0, null); + } + + @Override + public void open() throws IOException { + } @Override - public void send(String message) throws IOException { + public void send(String message, Charset charset) throws IOException { throw new IOException("error"); } @Override - void write(ByteBuffer buffer) throws IOException { + protected void write(byte[] data) throws IOException { throw new IOException("error"); } @Override - boolean isConnected() { + public boolean isConnected() { return false; } @Override - void close() { + public void close() { } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index e6df432..0639d5a 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -54,6 +54,7 @@ <module>nifi-scripting-bundle</module> <module>nifi-elasticsearch-bundle</module> <module>nifi-amqp-bundle</module> + <module>nifi-splunk-bundle</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ac29b01..6b20a7d 100644 --- a/pom.xml +++ b/pom.xml @@ -1064,6 +1064,12 @@ language governing permissions and limitations under the License. --> <groupId>org.apache.nifi</groupId> <artifactId>nifi-elasticsearch-nar</artifactId> <version>0.6.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-nar</artifactId> + <version>0.6.0-SNAPSHOT</version> <type>nar</type> </dependency> <dependency>