Repository: nifi
Updated Branches:
  refs/heads/master c59087bc3 -> e5281f1fc


NIFI-1221: Support batching of Syslog messages

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e5281f1f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e5281f1f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e5281f1f

Branch: refs/heads/master
Commit: e5281f1fc1865c653f8e2147622229d55c7d9ab1
Parents: c59087b
Author: Mark Payne <[email protected]>
Authored: Wed Nov 25 17:21:00 2015 -0500
Committer: Bryan Bende <[email protected]>
Committed: Mon Nov 30 17:32:49 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 397 ++++++++++++++-----
 .../nifi/processors/standard/ParseSyslog.java   | 150 +++++++
 .../nifi/processors/standard/PutSyslog.java     |   2 +
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestListenSyslog.java   | 227 +++++++----
 .../processors/standard/TestParseSyslog.java    |  61 +++
 6 files changed, 669 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index effaffc..fbe64ea 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -31,6 +31,7 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -47,16 +48,19 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.io.nio.BufferPool;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -70,6 +74,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
+@SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port 
over TCP or UDP. Incoming messages are checked against regular " +
@@ -92,7 +97,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                             "If this value is false, the other attributes will 
be empty and only the original message will be available in the content."),
                     @WritesAttribute(attribute="syslog.protocol", 
description="The protocol over which the Syslog message was received."),
                     @WritesAttribute(attribute="syslog.port", description="The 
port over which the Syslog message was received."),
-                    @WritesAttribute(attribute="mime.type", description="The 
mime.type of the FlowFile which will be text/plain for Syslog messages.")})
+                    @WritesAttribute(attribute = "mime.type", description = 
"The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
+@SeeAlso({PutSyslog.class, ParseSyslog.class})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
     public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
@@ -120,6 +126,31 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .defaultValue("2")
             .required(true)
             .build();
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Max Batch Size")
+        .description(
+            "The maximum number of Syslog events to add to a single FlowFile. 
If multiple events are available, they will be concatenated along with "
+                    + "the <Message Delimiter> up to this configured maximum 
number of messages")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1")
+        .required(true)
+        .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+        .name("Message Delimiter")
+        .description("Specifies the delimiter to place between Syslog messages 
when multiple messages are bundled together (see <Max Batch Size> property).")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("\\n")
+        .required(true)
+        .build();
+    public static final PropertyDescriptor PARSE_MESSAGES = new 
PropertyDescriptor.Builder()
+        .name("Parse Messages")
+        .description("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only " +
+            "contain the sender, protocol, and port, and no additional 
attributes.")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -133,11 +164,12 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
 
-    private volatile BufferPool bufferPool;
     private volatile ChannelReader channelReader;
     private volatile SyslogParser parser;
-    private volatile BlockingQueue<SyslogEvent> syslogEvents;
-    private volatile BlockingQueue<SyslogEvent> errorEvents;
+    private volatile BlockingQueue<ByteBuffer> bufferPool;
+    private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new 
LinkedBlockingQueue<>(10);
+    private volatile BlockingQueue<RawSyslogEvent> errorEvents = new 
LinkedBlockingQueue<>();
+    private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -147,6 +179,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(MAX_CONNECTIONS);
+        descriptors.add(MAX_BATCH_SIZE);
+        descriptors.add(MESSAGE_DELIMITER);
+        descriptors.add(PARSE_MESSAGES);
         descriptors.add(CHARSET);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
@@ -162,19 +197,32 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
     }
 
     @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return descriptors;
     }
 
     @Override
     public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-        // since properties were changed, clear any events that were queued
-        if (syslogEvents != null) {
-            syslogEvents.clear();
+        // if we are changing the protocol, the events that we may have queued 
up are no longer valid, as they
+        // were received using a different protocol and may be from a 
completely different source
+        if (PROTOCOL.equals(descriptor)) {
+            if (syslogEvents != null) {
+                syslogEvents.clear();
+            }
+            if (errorEvents != null) {
+                errorEvents.clear();
+            }
         }
-        if (errorEvents != null) {
-            errorEvents.clear();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+        if (validationContext.getProperty(MAX_BATCH_SIZE).asInteger() > 1 && 
validationContext.getProperty(PARSE_MESSAGES).asBoolean()) {
+            results.add(new ValidationResult.Builder().subject("Parse 
Messages").input("true").valid(false)
+                .explanation("Cannot set Parse Messages to 'true' if Batch 
Size is greater than 1").build());
         }
+        return results;
     }
 
     @OnScheduled
@@ -184,21 +232,26 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
-        final int maxConnections;
+        final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        final String charsetName = context.getProperty(CHARSET).getValue();
+        messageDemarcatorBytes = 
msgDemarcator.getBytes(Charset.forName(charsetName));
 
+        final int maxConnections;
         if (protocol.equals(UDP_VALUE.getValue())) {
             maxConnections = 1;
         } else {
             maxConnections = 
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
 
+        bufferPool = new LinkedBlockingQueue<>(maxConnections);
+        for (int i = 0; i < maxConnections; i++) {
+            bufferPool.offer(ByteBuffer.allocate(bufferSize));
+        }
+
         parser = new SyslogParser(Charset.forName(charSet));
-        bufferPool = new BufferPool(maxConnections, bufferSize, false, 
Integer.MAX_VALUE);
-        syslogEvents = new LinkedBlockingQueue<>(10);
-        errorEvents = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
-        channelReader = createChannelReader(protocol, bufferPool, parser, 
syslogEvents, maxConnections);
+        channelReader = createChannelReader(protocol, bufferPool, 
syslogEvents, maxConnections);
         channelReader.open(port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelReader);
@@ -207,13 +260,19 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         readerThread.start();
     }
 
+    // visible for testing.
+    protected SyslogParser getParser() {
+        return parser;
+    }
+
     // visible for testing to be overridden and provide a mock ChannelReader 
if desired
-    protected ChannelReader createChannelReader(final String protocol, final 
BufferPool bufferPool, final SyslogParser syslogParser, final 
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
+    protected ChannelReader createChannelReader(final String protocol, final 
BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> 
syslogEvents,
+        int maxConnections)
             throws IOException {
         if (protocol.equals(UDP_VALUE.getValue())) {
-            return new DatagramChannelReader(bufferPool, syslogParser, 
syslogEvents, getLogger());
+            return new DatagramChannelReader(bufferPool, syslogEvents, 
getLogger());
         } else {
-            return new SocketChannelReader(bufferPool, syslogParser, 
syslogEvents, getLogger(), maxConnections);
+            return new SocketChannelReader(bufferPool, syslogEvents, 
getLogger(), maxConnections);
         }
     }
 
@@ -230,66 +289,179 @@ public class ListenSyslog extends 
AbstractSyslogProcessor {
         }
     }
 
+    protected RawSyslogEvent getMessage(final boolean longPoll, final boolean 
pollErrorQueue) {
+        RawSyslogEvent rawSyslogEvent = null;
+        if (pollErrorQueue) {
+            rawSyslogEvent = errorEvents.poll();
+        }
+
+        if (rawSyslogEvent == null) {
+            try {
+                if (longPoll) {
+                    rawSyslogEvent = syslogEvents.poll(100, 
TimeUnit.MILLISECONDS);
+                } else {
+                    rawSyslogEvent = syslogEvents.poll();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return null;
+            }
+        }
+
+        return rawSyslogEvent;
+    }
+
+    protected int getErrorQueueSize() {
+        return errorEvents.size();
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // try to pull from the error queue first, if empty then pull from 
main queue
-        SyslogEvent initialEvent = errorEvents.poll();
-        if (initialEvent == null) {
-            initialEvent = syslogEvents.poll();
-        }
+        // poll the queue with a small timeout to avoid unnecessarily yielding 
below
+        RawSyslogEvent rawSyslogEvent = getMessage(true, true);
 
-        // if nothing in either queue then yield and return
-        if (initialEvent == null) {
+        // if nothing in the queue then yield and return
+        if (rawSyslogEvent == null) {
             context.yield();
             return;
         }
 
-        final SyslogEvent event = initialEvent;
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+
         final String port = context.getProperty(PORT).getValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
 
-        final Map<String,String> attributes = new HashMap<>();
-        attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
-        attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
-        attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
-        attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
-        attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
-        attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
-        attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
-        attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
-        attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
-        attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
-        attributes.put(SyslogAttributes.PORT.key(), port);
-        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-
-        FlowFile flowFile = session.create();
-        flowFile = session.putAllAttributes(flowFile, attributes);
-
-        final String transitUri = new 
StringBuilder().append(protocol).append("://").append(event.getSender())
-                .append(":").append(port).toString();
-
-        try {
-            // write the raw bytes of the message as the FlowFile content
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(OutputStream out) throws IOException {
-                    out.write(event.getRawMessage());
+        final Map<String, String> defaultAttributes = new HashMap<>(4);
+        defaultAttributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
+        defaultAttributes.put(SyslogAttributes.PORT.key(), port);
+        defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+
+        final int numAttributes = SyslogAttributes.values().length + 2;
+        final boolean shouldParse = 
context.getProperty(PARSE_MESSAGES).asBoolean();
+
+        final Map<String, FlowFile> flowFilePerSender = new HashMap<>();
+        final SyslogParser parser = getParser();
+
+        for (int i = 0; i < maxBatchSize; i++) {
+            SyslogEvent event = null;
+
+            // If this is our first iteration, we have already polled our 
queues. Otherwise, poll on each iteration.
+            if (i > 0) {
+                rawSyslogEvent = getMessage(false, false);
+
+                if (rawSyslogEvent == null) {
+                    break;
                 }
-            });
-
-            if (event.isValid()) {
-                getLogger().info("Transferring {} to success", new 
Object[]{flowFile});
-                session.transfer(flowFile, REL_SUCCESS);
-                session.getProvenanceReporter().receive(flowFile, transitUri);
-            } else {
-                getLogger().info("Transferring {} to invalid", new 
Object[]{flowFile});
-                session.transfer(flowFile, REL_INVALID);
             }
 
-        } catch (ProcessException e) {
-            getLogger().error("Error processing Syslog message", e);
-            errorEvents.offer(event);
-            session.remove(flowFile);
+            final String sender = rawSyslogEvent.getSender();
+            FlowFile flowFile = flowFilePerSender.get(sender);
+            if (flowFile == null) {
+                flowFile = session.create();
+                flowFilePerSender.put(sender, flowFile);
+            }
+
+            if (shouldParse) {
+                boolean valid = true;
+                try {
+                    event = parser.parseEvent(rawSyslogEvent.getRawMessage(), 
sender);
+                } catch (final ProcessException pe) {
+                    getLogger().warn("Failed to parse Syslog event; routing to 
invalid");
+                    valid = false;
+                }
+
+                // If the event is invalid, route it to 'invalid' and then 
stop.
+                // We create a separate FlowFile for this case instead of 
using 'flowFile',
+                // because the 'flowFile' object may already have data written 
to it.
+                if (!valid || !event.isValid()) {
+                    FlowFile invalidFlowFile = session.create();
+                    invalidFlowFile = 
session.putAllAttributes(invalidFlowFile, defaultAttributes);
+                    if (sender != null) {
+                        invalidFlowFile = 
session.putAttribute(invalidFlowFile, SyslogAttributes.SENDER.key(), sender);
+                    }
+
+                    try {
+                        final byte[] rawBytes = rawSyslogEvent.getRawMessage();
+                        invalidFlowFile = session.write(invalidFlowFile, new 
OutputStreamCallback() {
+                            @Override
+                            public void process(final OutputStream out) throws 
IOException {
+                                out.write(rawBytes);
+                            }
+                        });
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to write contents of Syslog 
message to FlowFile due to {}; will re-queue message and try again", e);
+                        errorEvents.offer(rawSyslogEvent);
+                        session.remove(invalidFlowFile);
+                        break;
+                    }
+
+                    session.transfer(invalidFlowFile, REL_INVALID);
+                    break;
+                }
+
+                getLogger().trace(event.getFullMessage());
+
+                final Map<String, String> attributes = new 
HashMap<>(numAttributes);
+                attributes.put(SyslogAttributes.PRIORITY.key(), 
event.getPriority());
+                attributes.put(SyslogAttributes.SEVERITY.key(), 
event.getSeverity());
+                attributes.put(SyslogAttributes.FACILITY.key(), 
event.getFacility());
+                attributes.put(SyslogAttributes.VERSION.key(), 
event.getVersion());
+                attributes.put(SyslogAttributes.TIMESTAMP.key(), 
event.getTimeStamp());
+                attributes.put(SyslogAttributes.HOSTNAME.key(), 
event.getHostName());
+                attributes.put(SyslogAttributes.BODY.key(), 
event.getMsgBody());
+                attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
+
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+
+            // figure out if we should write the bytes from the raw event or 
parsed event
+            final boolean writeDemarcator = (i > 0);
+
+            try {
+                // write the raw bytes of the message as the FlowFile content
+                final byte[] rawMessage = (event == null) ? 
rawSyslogEvent.getRawMessage() : event.getRawMessage();
+                flowFile = session.append(flowFile, new OutputStreamCallback() 
{
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+            } catch (final Exception e) {
+                getLogger().error("Failed to write contents of Syslog message 
to FlowFile due to {}; will re-queue message and try again", e);
+                errorEvents.offer(rawSyslogEvent);
+                break;
+            }
+
+            session.adjustCounter("Messages Received", 1L, false);
+            flowFilePerSender.put(sender, flowFile);
+        }
+
+
+        for (final Map.Entry<String, FlowFile> entry : 
flowFilePerSender.entrySet()) {
+            final String sender = entry.getKey();
+            FlowFile flowFile = entry.getValue();
+
+            if (flowFile.getSize() == 0L) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from Sender {}; 
removing FlowFile", new Object[] {sender});
+                continue;
+            }
+
+            final Map<String, String> newAttributes = new 
HashMap<>(defaultAttributes.size() + 1);
+            newAttributes.putAll(defaultAttributes);
+            newAttributes.put(SyslogAttributes.SENDER.key(), sender);
+            flowFile = session.putAllAttributes(flowFile, newAttributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            final String senderHost = sender.startsWith("/") && 
sender.length() > 1 ? sender.substring(1) : sender;
+            final String transitUri = new 
StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
+            session.getProvenanceReporter().receive(flowFile, transitUri);
         }
     }
 
@@ -313,18 +485,15 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
      */
     public static class DatagramChannelReader implements ChannelReader {
 
-        private final BufferPool bufferPool;
-        private final SyslogParser syslogParser;
-        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final BlockingQueue<ByteBuffer> bufferPool;
+        private final BlockingQueue<RawSyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private DatagramChannel datagramChannel;
         private volatile boolean stopped = false;
         private Selector selector;
 
-        public DatagramChannelReader(final BufferPool bufferPool, final 
SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                     final ProcessorLog logger) {
+        public DatagramChannelReader(final BlockingQueue<ByteBuffer> 
bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final 
ProcessorLog logger) {
             this.bufferPool = bufferPool;
-            this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
         }
@@ -360,17 +529,22 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
                                 continue;
                             }
                             DatagramChannel channel = (DatagramChannel) 
key.channel();
-                            SocketAddress sender;
+                            SocketAddress socketAddress;
                             buffer.clear();
-                            while (!stopped && (sender = 
channel.receive(buffer)) != null) {
-                                final SyslogEvent event;
-                                if (sender instanceof InetSocketAddress) {
-                                    event = syslogParser.parseEvent(buffer, 
((InetSocketAddress)sender).getAddress().toString());
-                                } else {
-                                    event = syslogParser.parseEvent(buffer);
+                            while (!stopped && (socketAddress = 
channel.receive(buffer)) != null) {
+                                String sender = "";
+                                if (socketAddress instanceof 
InetSocketAddress) {
+                                    sender = ((InetSocketAddress) 
socketAddress).getAddress().toString();
                                 }
-                                logger.trace(event.getFullMessage());
-                                syslogEvents.put(event); // block until space 
is available
+
+                                // create a byte array from the buffer
+                                buffer.flip();
+                                byte bytes[] = new byte[buffer.limit()];
+                                buffer.get(bytes, 0, buffer.limit());
+
+                                // queue the raw message with the sender, 
block until space is available
+                                syslogEvents.put(new RawSyslogEvent(bytes, 
sender));
+                                buffer.clear();
                             }
                         }
                     }
@@ -380,8 +554,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     logger.error("Error reading from DatagramChannel", e);
                 }
             }
+
             if (buffer != null) {
-                bufferPool.returnBuffer(buffer, 0);
+                try {
+                    bufferPool.put(buffer);
+                } catch (InterruptedException e) {
+                    // nothing to do here
+                }
             }
         }
 
@@ -409,9 +588,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
      */
     public static class SocketChannelReader implements ChannelReader {
 
-        private final BufferPool bufferPool;
-        private final SyslogParser syslogParser;
-        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final BlockingQueue<ByteBuffer> bufferPool;
+        private final BlockingQueue<RawSyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ExecutorService executor;
         private volatile boolean stopped = false;
@@ -420,10 +598,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final int maxConnections;
         private final AtomicInteger currentConnections = new AtomicInteger(0);
 
-        public SocketChannelReader(final BufferPool bufferPool, final 
SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                   final ProcessorLog logger, final int 
maxConnections) {
+        public SocketChannelReader(final BlockingQueue<ByteBuffer> bufferPool, 
final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger, 
final int maxConnections) {
             this.bufferPool = bufferPool;
-            this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
             this.maxConnections = maxConnections;
@@ -486,8 +662,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 // Clear out the operations the select is 
interested in until done reading
                                 key.interestOps(0);
                                 // Create and execute the read handler
-                                final SocketChannelHandler handler = new 
SocketChannelHandler(key, this,
-                                        syslogParser, syslogEvents, logger);
+                                final SocketChannelHandler handler = new 
SocketChannelHandler(key, this, syslogEvents, logger);
                                 // and launch the thread
                                 executor.execute(handler);
                             }
@@ -546,7 +721,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         public void completeConnection(SelectionKey key) {
             // connection is done. Return the buffer to the pool
-            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+            try {
+                bufferPool.put((ByteBuffer) key.attachment());
+            } catch (InterruptedException e) {
+                // nothing to do here
+            }
             currentConnections.decrementAndGet();
         }
 
@@ -565,16 +744,13 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
 
         private final SelectionKey key;
         private final SocketChannelReader dispatcher;
-        private final SyslogParser syslogParser;
-        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final BlockingQueue<RawSyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
 
-        public SocketChannelHandler(final SelectionKey key, final 
SocketChannelReader dispatcher, final SyslogParser syslogParser,
-                                    final BlockingQueue<SyslogEvent> 
syslogEvents, final ProcessorLog logger) {
+        public SocketChannelHandler(final SelectionKey key, final 
SocketChannelReader dispatcher, final BlockingQueue<RawSyslogEvent> 
syslogEvents, final ProcessorLog logger) {
             this.key = key;
             this.dispatcher = dispatcher;
-            this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
         }
@@ -609,11 +785,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
                         // check if at end of a message
                         if (currByte == '\n') {
-                            // parse an event, reset the buffer
-                            final SyslogEvent event = 
syslogParser.parseEvent(currBytes.toByteArray(),
-                                    
socketChannel.socket().getInetAddress().toString());
-                            logger.trace(event.getFullMessage());
-                            syslogEvents.put(event); // block until space is 
available
+                            String sender = 
socketChannel.socket().getInetAddress().toString();
+                            // queue the raw event blocking until space is 
available, reset the buffer
+                            syslogEvents.put(new 
RawSyslogEvent(currBytes.toByteArray(), sender));
                             currBytes.reset();
                             // Mark this as the start of the next message
                             socketBuffer.mark();
@@ -655,4 +829,25 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 + "maximum receive buffer");
     }
 
+    // Wrapper class to pass around the raw message and the host/ip that sent 
it
+    public static class RawSyslogEvent {
+
+        final byte[] rawMessage;
+        final String sender;
+
+        public RawSyslogEvent(byte[] rawMessage, String sender) {
+            this.rawMessage = rawMessage;
+            this.sender = sender;
+        }
+
+        public byte[] getRawMessage() {
+            return this.rawMessage;
+        }
+
+        public String getSender() {
+            return this.sender;
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
new file mode 100644
index 0000000..1490cc2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.StreamUtils;
+
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"logs", "syslog", "attributes", "system", "event", "message"})
+@CapabilityDescription("Parses the contents of a Syslog message and adds 
attributes to the FlowFile for each of the parts of the Syslog message")
+@WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description 
= "The priority of the Syslog message."),
+    @WritesAttribute(attribute = "syslog.severity", description = "The 
severity of the Syslog message derived from the priority."),
+    @WritesAttribute(attribute = "syslog.facility", description = "The 
facility of the Syslog message derived from the priority."),
+    @WritesAttribute(attribute = "syslog.version", description = "The optional 
version from the Syslog message."),
+    @WritesAttribute(attribute = "syslog.timestamp", description = "The 
timestamp of the Syslog message."),
+    @WritesAttribute(attribute = "syslog.hostname", description = "The 
hostname of the Syslog message."),
+    @WritesAttribute(attribute = "syslog.sender", description = "The hostname 
of the Syslog server that sent the message."),
+    @WritesAttribute(attribute = "syslog.body", description = "The body of the 
Syslog message, everything after the hostname.")})
+@SeeAlso({ListenSyslog.class, PutSyslog.class})
+public class ParseSyslog extends AbstractProcessor {
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("Specifies which character set of the Syslog messages")
+        .required(true)
+        .defaultValue("UTF-8")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that could not be parsed as a Syslog 
message will be transferred to this Relationship without any attributes being 
added")
+        .build();
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully parsed as a Syslog 
message will be to this Relationship.")
+        .build();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(1);
+        properties.add(CHARSET);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String charsetName = context.getProperty(CHARSET).getValue();
+        final SyslogParser parser = new 
SyslogParser(Charset.forName(charsetName));
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
+            }
+        });
+
+        final SyslogEvent event;
+        try {
+            event = parser.parseEvent(buffer, null);
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to parse {} as a Syslog message due to 
{}; routing to failure", new Object[] {flowFile, pe});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        if (!event.isValid()) {
+            getLogger().error("Failed to parse {} as a Syslog message: it does 
not conform to any of the RFC formats supported; routing to failure", new 
Object[] {flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(8);
+        attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
+        attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
+        attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
+        attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
+        attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
+        attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
+        attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
+
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 733c113..9cb6508 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -64,6 +65,7 @@ import java.util.regex.Pattern;
         "or it can be an RFC3164 timestamp with a format of \"MMM d 
HH:mm:ss\". If a message is constructed that does not form a valid Syslog 
message according to the " +
         "above description, then it is routed to the invalid relationship. 
Valid messages are sent to the Syslog server and successes are routed to the 
success relationship, " +
         "failures routed to the failure relationship.")
+@SeeAlso({ListenSyslog.class, ParseSyslog.class})
 public class PutSyslog extends AbstractSyslogProcessor {
 
     public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 56265a9..befa5e7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -53,6 +53,7 @@ org.apache.nifi.processors.standard.LogAttribute
 org.apache.nifi.processors.standard.MergeContent
 org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity
+org.apache.nifi.processors.standard.ParseSyslog
 org.apache.nifi.processors.standard.PostHTTP
 org.apache.nifi.processors.standard.PutEmail
 org.apache.nifi.processors.standard.PutDistributedMapCache

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index f0eb345..d9dc8f0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,34 +16,40 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ListenSyslog.RawSyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.IntegerHolder;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
 public class TestListenSyslog {
 
     static final Logger LOGGER = 
LoggerFactory.getLogger(TestListenSyslog.class);
@@ -100,8 +106,7 @@ public class TestListenSyslog {
 
             final ProvenanceEventRecord event = events.get(0);
             Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
-            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
-                    event.getTransitUri());
+            Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
 
         } finally {
             // unschedule to close connections
@@ -151,8 +156,7 @@ public class TestListenSyslog {
 
             final ProvenanceEventRecord event = events.get(0);
             Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
-            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
-                    event.getTransitUri());
+            Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
 
         } finally {
             // unschedule to close connections
@@ -203,8 +207,7 @@ public class TestListenSyslog {
 
             final ProvenanceEventRecord event = events.get(0);
             Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
-            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
-                    event.getTransitUri());
+            Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
 
         } finally {
             // unschedule to close connections
@@ -213,6 +216,57 @@ public class TestListenSyslog {
     }
 
     @Test
+    public void testBatching() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.UDP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+        runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "25");
+        runner.setProperty(ListenSyslog.MESSAGE_DELIMITER, "|");
+        runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some UDP messages to the port in the background
+        final Thread sender = new Thread(new DatagramSender(port, numMessages, 
10, VALID_MESSAGE.replaceAll("\\n", "")));
+        sender.setDaemon(true);
+        sender.start();
+        sender.join();
+
+        try {
+            proc.onTrigger(context, processSessionFactory);
+            runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
+
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            Assert.assertEquals("0", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
+            
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
+
+            final String content = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+            final String[] splits = content.split("\\|");
+            Assert.assertEquals(20, splits.length);
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(1, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
     public void testInvalid() throws IOException, InterruptedException {
         final ListenSyslog proc = new ListenSyslog();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -254,25 +308,69 @@ public class TestListenSyslog {
     }
 
     @Test
-    public void testErrorQueue() {
-        final SyslogEvent event1 = Mockito.mock(SyslogEvent.class);
-        Mockito.when(event1.getRawMessage()).thenThrow(new 
ProcessException("ERROR"));
-
-        final SyslogEvent event2 = new SyslogEvent.Builder()
-                .facility("fac").severity("sev")
-                .fullMessage("abc").hostname("host")
-                .msgBody("body").timestamp("123").valid(true)
-                .rawMessage("abc".getBytes(Charset.forName("UTF-8")))
-                .build();
-
-        final MockProcessor proc = new MockProcessor(Arrays.asList(event1, 
event2));
+    public void testParsingError() throws IOException {
+        final FailParseProcessor proc = new FailParseProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PORT, "12345");
+        runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.UDP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        try {
+            final int port = proc.getPort();
+            final DatagramSender sender = new DatagramSender(port, 1, 1, 
INVALID_MESSAGE);
+            sender.run();
+
+            // should keep re-processing event1 from the error queue
+            proc.onTrigger(context, processSessionFactory);
+            runner.assertTransferCount(ListenSyslog.REL_INVALID, 1);
+            runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
+        } finally {
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testErrorQueue() throws IOException {
+        final List<RawSyslogEvent> msgs = new ArrayList<>();
+        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
+        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
+
+        // Add message that will throw a FlowFileAccessException the first 
time that we attempt to read
+        // the contents but will succeeed the second time.
+        final IntegerHolder getMessageAttempts = new IntegerHolder(0);
+        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") {
+            @Override
+            public byte[] getRawMessage() {
+                final int attempts = getMessageAttempts.incrementAndGet();
+                if (attempts == 1) {
+                    throw new FlowFileAccessException("Unit test failure");
+                } else {
+                    return VALID_MESSAGE.getBytes();
+                }
+            }
+        });
 
-        // should keep re-processing event1 from the error queue
-        runner.run(3);
-        runner.assertTransferCount(ListenSyslog.REL_INVALID, 0);
-        runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
+        final CannedMessageProcessor proc = new CannedMessageProcessor(msgs);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "5");
+        runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.UDP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+        runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
+
+        runner.run();
+        assertEquals(1, proc.getErrorQueueSize());
+        runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE
 + "\n" + VALID_MESSAGE);
+
+        // running again should pull from the error queue
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE);
     }
 
 
@@ -420,46 +518,39 @@ public class TestListenSyslog {
     }
 
     // A mock version of ListenSyslog that will queue the provided events
-    private static class MockProcessor extends ListenSyslog {
-
-        private List<SyslogEvent> eventList;
-
-        public MockProcessor(List<SyslogEvent> eventList) {
-            this.eventList = eventList;
-        }
-
+    private static class FailParseProcessor extends ListenSyslog {
         @Override
-        protected ChannelReader createChannelReader(final String protocol, 
final BufferPool bufferPool, final SyslogParser syslogParser,
-                                                    final 
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
-            return new ChannelReader() {
-                @Override
-                public void open(int port, int maxBufferSize) throws 
IOException {
-
-                }
-
+        protected SyslogParser getParser() {
+            return new SyslogParser(StandardCharsets.UTF_8) {
                 @Override
-                public int getPort() {
-                    return 0;
+                public SyslogEvent parseEvent(byte[] bytes, String sender) {
+                    throw new ProcessException("Unit test intentionally 
failing");
                 }
+            };
+        }
+    }
 
-                @Override
-                public void stop() {
-
-                }
+    private static class CannedMessageProcessor extends ListenSyslog {
+        private final Iterator<RawSyslogEvent> eventItr;
 
-                @Override
-                public void close() {
+        public CannedMessageProcessor(final List<RawSyslogEvent> events) {
+            this.eventItr = events.iterator();
+        }
 
-                }
+        @Override
+        public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+            properties.remove(PORT);
+            properties.add(new 
PropertyDescriptor.Builder().name(PORT.getName()).addValidator(Validator.VALID).build());
+            return properties;
+        }
 
-                @Override
-                public void run() {
-                    for (SyslogEvent event : eventList) {
-                        syslogEvents.offer(event);
-                    }
-                }
-            };
+        @Override
+        protected RawSyslogEvent getMessage(final boolean longPoll, final 
boolean pollErrorQueue) {
+            if (eventItr.hasNext()) {
+                return eventItr.next();
+            }
+            return super.getMessage(longPoll, pollErrorQueue);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
new file mode 100644
index 0000000..a1a4d04
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import 
org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestParseSyslog {
+    static final String PRI = "34";
+    static final String SEV = "2";
+    static final String FAC = "4";
+    static final String TIME = "Oct 13 15:43:23";
+    static final String HOST = "localhost.home";
+    static final String BODY = "some message";
+
+    static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " 
+ HOST + " " + BODY + "\n";
+
+    @Test
+    public void testSuccessfulParse3164() {
+        final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
+        runner.enqueue(VALID_MESSAGE_RFC3164_0.getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
+        mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
+        mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), HOST);
+        mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
+        mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
+        mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
+    }
+
+
+    @Test
+    public void testInvalidMessage() {
+        final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
+        runner.enqueue("<hello> yesterday localhost\n".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ParseSyslog.REL_FAILURE, 1);
+    }
+}

Reply via email to