Updated Branches: refs/heads/trunk f54148a3f -> 30293ea1e
FLUME-1385. Add a multiport syslog source (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30293ea1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30293ea1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30293ea1 Branch: refs/heads/trunk Commit: 30293ea1e9421cc4684d8e997770fe6f171970eb Parents: f54148a Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 14 14:12:51 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 14 14:12:51 2012 -0700 ---------------------------------------------------------------------- flume-ng-core/pom.xml | 5 + .../flume/source/MultiportSyslogTCPSource.java | 539 +++++++++++++++ .../java/org/apache/flume/source/SyslogParser.java | 323 +++++++++ .../source/SyslogSourceConfigurationConstants.java | 31 + .../flume/source/TestMultiportSyslogTCPSource.java | 382 ++++++++++ .../org/apache/flume/source/TestSyslogParser.java | 85 +++ pom.xml | 5 + 7 files changed, 1370 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index 9600dfc..4592a9d 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -183,6 +183,11 @@ limitations under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-core</artifactId> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java new file mode 100644 index 0000000..884fd62 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class MultiportSyslogTCPSource extends AbstractSource implements + EventDrivenSource, Configurable { + + public static final Logger logger = LoggerFactory.getLogger( + MultiportSyslogTCPSource.class); + + private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets; + + private List<Integer> ports = Lists.newArrayList(); + private String host; + private NioSocketAcceptor acceptor; + private Integer numProcessors; + private int maxEventSize; + private int batchSize; + private int readBufferSize; + private String portHeader; + private SourceCounter sourceCounter = null; + private Charset defaultCharset; + private ThreadSafeDecoder defaultDecoder; + + public MultiportSyslogTCPSource() { + portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>(); + } + + @Override + public void configure(Context context) { + String portsStr = context.getString( + SyslogSourceConfigurationConstants.CONFIG_PORTS); + + Preconditions.checkNotNull(portsStr, "Must define config " + + "parameter for MultiportSyslogTCPSource: ports"); + + for (String portStr : portsStr.split("\\s+")) { + Integer port = Integer.parseInt(portStr); + ports.add(port); + } + + host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); + + numProcessors = context.getInteger( + SyslogSourceConfigurationConstants.CONFIG_NUMPROCESSORS); + + maxEventSize = context.getInteger( + SyslogSourceConfigurationConstants.CONFIG_EVENTSIZE, + SyslogUtils.DEFAULT_SIZE); + + String defaultCharsetStr = context.getString( + SyslogSourceConfigurationConstants.CONFIG_CHARSET, + SyslogSourceConfigurationConstants.DEFAULT_CHARSET); + try { + defaultCharset = Charset.forName(defaultCharsetStr); + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to parse charset " + + "string (" + defaultCharsetStr + ") from port configuration.", ex); + + } + + defaultDecoder = new ThreadSafeDecoder(defaultCharset); + + // clear any previous charset configuration and reconfigure it + portCharsets.clear(); + { + ImmutableMap<String, String> portCharsetCfg = context.getSubProperties( + SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX); + for (Map.Entry<String, String> entry : portCharsetCfg.entrySet()) { + String portStr = entry.getKey(); + String charsetStr = entry.getValue(); + Integer port = Integer.parseInt(portStr); + Preconditions.checkNotNull(port, "Invalid port number in config"); + try { + Charset charset = Charset.forName(charsetStr); + portCharsets.put(port, new ThreadSafeDecoder(charset)); + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to parse charset " + + "string (" + charsetStr + ") from port configuration.", ex); + } + } + } + + batchSize = context.getInteger( + SyslogSourceConfigurationConstants.CONFIG_BATCHSIZE, + SyslogSourceConfigurationConstants.DEFAULT_BATCHSIZE); + + portHeader = context.getString( + SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER); + + readBufferSize = context.getInteger( + SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE, + SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE); + + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } + } + + @Override + public void start() { + logger.info("Starting {}...", this); + + // allow user to specify number of processors to use for thread pool + if (numProcessors != null) { + acceptor = new NioSocketAcceptor(numProcessors); + } else { + acceptor = new NioSocketAcceptor(); + } + acceptor.setReuseAddress(true); + acceptor.getSessionConfig().setReadBufferSize(readBufferSize); + acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); + + acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize, + getChannelProcessor(), sourceCounter, portHeader, defaultDecoder, + portCharsets)); + + for (int port : ports) { + InetSocketAddress addr; + if (host != null) { + addr = new InetSocketAddress(host, port); + } else { + addr = new InetSocketAddress(port); + } + try { + //Not using the one that takes an array because we won't want one bind + //error affecting the next. + acceptor.bind(addr); + } catch (IOException ex) { + logger.error("Could not bind to address: " + String.valueOf(addr), ex); + } + } + + sourceCounter.start(); + super.start(); + + logger.info("{} started.", this); + } + + @Override + public void stop() { + logger.info("Stopping {}...", this); + + acceptor.unbind(); + acceptor.dispose(); + + sourceCounter.stop(); + super.stop(); + + logger.info("{} stopped. Metrics: {}", this, sourceCounter); + } + + @Override + public String toString() { + return "Multiport Syslog TCP source " + getName(); + } + + static class MultiportSyslogHandler extends IoHandlerAdapter { + + private static final String SAVED_BUF = "savedBuffer"; + private final ChannelProcessor channelProcessor; + private final int maxEventSize; + private final int batchSize; + private final SourceCounter sourceCounter; + private final String portHeader; + private final SyslogParser syslogParser; + private final LineSplitter lineSplitter; + private final ThreadSafeDecoder defaultDecoder; + private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets; + + public MultiportSyslogHandler(int maxEventSize, int batchSize, + ChannelProcessor cp, SourceCounter ctr, String portHeader, + ThreadSafeDecoder defaultDecoder, + ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets) { + channelProcessor = cp; + sourceCounter = ctr; + this.maxEventSize = maxEventSize; + this.batchSize = batchSize; + this.portHeader = portHeader; + this.defaultDecoder = defaultDecoder; + this.portCharsets = portCharsets; + syslogParser = new SyslogParser(); + lineSplitter = new LineSplitter(maxEventSize); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) + throws Exception { + logger.error("Error in syslog message handler", cause); + if (cause instanceof Error) { + Throwables.propagate(cause); + } + } + + @Override + public void sessionCreated(IoSession session) { + logger.info("Session created: {}", session); + + // Allocate saved buffer when session is created. + // This allows us to parse an incomplete message and use it on + // the next request. + session.setAttribute(SAVED_BUF, IoBuffer.allocate(maxEventSize, false)); + } + + @Override + public void sessionOpened(IoSession session) { + // debug level so it isn't too spammy together w/ sessionCreated() + logger.debug("Session opened: {}", session); + } + + @Override + public void sessionClosed(IoSession session) { + logger.info("Session closed: {}", session); + } + + @Override + public void messageReceived(IoSession session, Object message) { + + IoBuffer buf = (IoBuffer) message; + IoBuffer savedBuf = (IoBuffer) session.getAttribute(SAVED_BUF); + + ParsedBuffer parsedLine = new ParsedBuffer(); + List<Event> events = Lists.newArrayList(); + + // the character set can be specified per-port + CharsetDecoder decoder = defaultDecoder.get(); + int port = + ((InetSocketAddress) session.getLocalAddress()).getPort(); + if (portCharsets.containsKey(port)) { + decoder = portCharsets.get(port).get(); + } + + // while the buffer is not empty + while (buf.hasRemaining()) { + events.clear(); + + // take number of events no greater than batchSize + for (int num = 0; num < batchSize && buf.hasRemaining(); num++) { + + if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) { + Event event = parseEvent(parsedLine, decoder); + if (portHeader != null) { + event.getHeaders().put(portHeader, String.valueOf(port)); + } + events.add(event); + } else { + logger.trace("Parsed null event"); + } + + } + + // don't try to write anything if we didn't get any events somehow + if (events.isEmpty()) { + logger.trace("Empty set!"); + return; + } + + int numEvents = events.size(); + sourceCounter.addToEventReceivedCount(numEvents); + + // write the events to the downstream channel + try { + channelProcessor.processEventBatch(events); + sourceCounter.addToEventAcceptedCount(numEvents); + } catch (Throwable t) { + logger.error("Error writing to channel, event dropped", t); + if (t instanceof Error) { + Throwables.propagate(t); + } + } + } + + } + + /** + * Decodes a syslog-formatted ParsedLine into a Flume Event. + * @param parsedBuf Buffer containing characters to be parsed + * @param port Incoming port + * @return + */ + Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) { + String msg = null; + try { + msg = parsedBuf.buffer.getString(decoder); + } catch (Throwable t) { + logger.info("Error decoding line with charset (" + decoder.charset() + + "). Exception follows.", t); + + if (t instanceof Error) { + Throwables.propagate(t); + } + + // fall back to byte array + byte[] bytes = new byte[parsedBuf.buffer.remaining()]; + parsedBuf.buffer.get(bytes); + + Event event = EventBuilder.withBody(bytes); + event.getHeaders().put(SyslogUtils.EVENT_STATUS, + SyslogUtils.SyslogStatus.INVALID.getSyslogStatus()); + + return event; + } + + logger.trace("Seen raw event: {}", msg); + + Event event; + try { + event = syslogParser.parseMessage(msg, decoder.charset()); + if (parsedBuf.incomplete) { + event.getHeaders().put(SyslogUtils.EVENT_STATUS, + SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus()); + } + } catch (IllegalArgumentException ex) { + event = EventBuilder.withBody(msg, decoder.charset()); + event.getHeaders().put(SyslogUtils.EVENT_STATUS, + SyslogUtils.SyslogStatus.INVALID.getSyslogStatus()); + logger.debug("Error parsing syslog event", ex); + } + + return event; + } + } + + /** + * This class is designed to parse lines up to a maximum length. If the line + * exceeds the given length, it is cut off at that mark and an overflow flag + * is set for the line. If less than the specified length is parsed, and a + * newline is not found, then the parsed data is saved in a buffer provided + * for that purpose so that it can be used in the next round of parsing. + */ + static class LineSplitter { + + private final static byte NEWLINE = '\n'; + private final int maxLineLength; + + public LineSplitter(int maxLineLength) { + this.maxLineLength = maxLineLength; + } + + /** + * Parse a line from the IoBuffer {@code buf} and store it into + * {@code parsedBuf} except for the trailing newline character. If a line + * is successfully parsed, returns {@code true}. + * <p/>If no newline is found, and + * the number of bytes traversed is less than {@code maxLineLength}, then + * the data read from {@code buf} is stored in {@code savedBuf} and this + * method returns {@code false}. + * <p/>If the number of characters traversed + * equals {@code maxLineLength}, but a newline was not found, then the + * {@code parsedBuf} variable will be populated, the {@code overflow} flag + * will be set in the {@code ParsedBuffer} object, and this function will + * return {@code true}. + */ + public boolean parseLine(IoBuffer buf, IoBuffer savedBuf, + ParsedBuffer parsedBuf) { + + // clear out passed-in ParsedBuffer object + parsedBuf.buffer = null; + parsedBuf.incomplete = false; + + byte curByte; + + buf.mark(); + int msgPos = savedBuf.position(); // carry on from previous buffer + boolean seenNewline = false; + while (!seenNewline && buf.hasRemaining() && msgPos < maxLineLength) { + curByte = buf.get(); + + // we are looking for newline delimiters between events + if (curByte == NEWLINE) { + seenNewline = true; + } + + msgPos++; + } + + // hit a newline? + if (seenNewline) { + + int end = buf.position(); + buf.reset(); + int start = buf.position(); + + if (savedBuf.position() > 0) { + // complete the saved buffer + byte[] tmp = new byte[end - start]; + buf.get(tmp); + savedBuf.put(tmp); + int len = savedBuf.position() - 1; + savedBuf.flip(); + + parsedBuf.buffer = savedBuf.getSlice(len); + + savedBuf.clear(); + } else { + parsedBuf.buffer = buf.getSlice(end - start - 1); + + buf.get(); // throw away newline + } + + return true; + + // we either emptied our buffer or hit max msg size + } else { + + // exceeded max message size + if (msgPos == maxLineLength) { + + int end = buf.position(); + buf.reset(); + int start = buf.position(); + + if (savedBuf.position() > 0) { + // complete the saved buffer + byte[] tmp = new byte[end - start]; + buf.get(tmp); + savedBuf.put(tmp); + savedBuf.flip(); + parsedBuf.buffer = savedBuf.getSlice(msgPos); + savedBuf.clear(); + } else { + // no newline found + parsedBuf.buffer = buf.getSlice(msgPos); + } + + logger.warn("Event size larger than specified event size: {}. " + + "Consider increasing the max event size.", maxLineLength); + + parsedBuf.incomplete = true; + + return true; + + // message fragmentation; save in buffer for later + } else if (!buf.hasRemaining()) { + + int end = buf.position(); + buf.reset(); + int start = buf.position(); + byte[] tmp = new byte[end - start]; + buf.get(tmp); + savedBuf.put(tmp); + + return false; + + // this should never happen + } else { + + throw new IllegalStateException("unexpected buffer state: " + + "msgPos=" + msgPos + ", buf.hasRemaining=" + buf.hasRemaining() + + ", savedBuf.hasRemaining=" + savedBuf.hasRemaining() + + ", seenNewline=" + seenNewline + ", maxLen=" + maxLineLength); + + } + + } + } + + } + + /** + * Private struct to represent a simple text line parsed from a message. + */ + static class ParsedBuffer { + + /** + * The parsed line of text, without the newline character. + */ + public IoBuffer buffer = null; + /** + * The incomplete flag is set if the source line length exceeds the maximum + * allowed line length. In that case, the returned line will have length + * equal to the maximum line length. + */ + public boolean incomplete = false; + } + + /** + * Package private only for unit testing + */ + static class ThreadSafeDecoder extends ThreadLocal<CharsetDecoder> { + private final Charset charset; + + public ThreadSafeDecoder(Charset charset) { + this.charset = charset; + } + + @Override + protected CharsetDecoder initialValue() { + return charset.newDecoder(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java new file mode 100644 index 0000000..bf3305c --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ + +package org.apache.flume.source; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.collect.Maps; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyslogParser { + + private static final Logger logger = + LoggerFactory.getLogger(SyslogParser.class); + + private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit + private static final Pattern TWO_SPACES = Pattern.compile(" "); + private static final DateTimeFormatter rfc3164Format = + DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC(); + + private static final String timePat = "yyyy-MM-dd'T'HH:mm:ss"; + private static final int RFC3164_LEN = 15; + private static final int RFC5424_PREFIX_LEN = 19; + + private final DateTimeFormatter timeParser; + + private Cache<String, Long> timestampCache; + + public SyslogParser() { + timeParser = DateTimeFormat.forPattern(timePat).withZoneUTC(); + timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build( + new CacheLoader<String, Long>() { + + @Override + public Long load(String key) throws Exception { + return timeParser.parseMillis(key); + } + }); + } + + /** + * Parses a Flume Event out of a syslog message string. + * @param msg Syslog message, not including the newline character + * @return Parsed Flume Event + * @throws IllegalArgumentException if unable to successfully parse message + */ + public Event parseMessage(String msg, Charset charset) { + Map<String, String> headers = Maps.newHashMap(); + + int msgLen = msg.length(); + + int curPos = 0; + + Preconditions.checkArgument(msg.charAt(curPos) == '<', + "Bad format: invalid priority: cannot find open bracket '<' (%s)", msg); + + int endBracketPos = msg.indexOf('>'); + Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6, + "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg); + + String priority = msg.substring(1, endBracketPos); + int pri = Integer.parseInt(priority); + int facility = pri / 8; + int severity = pri % 8; + + // put fac / sev into header + headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility)); + headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity)); + + Preconditions.checkArgument(msgLen > endBracketPos + 1, + "Bad format: no data except priority (%s)", msg); + + // update parsing position + curPos = endBracketPos + 1; + + // ignore version string + if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) { + curPos += 2; + } + + // now parse timestamp (handle different varieties) + + long ts; + char dateStartChar = msg.charAt(curPos); + + try { + + // no timestamp specified; use relay current time + if (dateStartChar == '-') { + ts = System.currentTimeMillis(); + if (msgLen <= curPos + 2) { + throw new IllegalArgumentException( + "bad syslog format (missing hostname)"); + } + curPos += 2; // assume we skip past a space to get to the hostname + + // rfc3164 imestamp + } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') { + if (msgLen <= curPos + RFC3164_LEN) { + throw new IllegalArgumentException("bad timestamp format"); + } + ts = parseRfc3164Time( + msg.substring(curPos, curPos + RFC3164_LEN)); + curPos += RFC3164_LEN + 1; + + // rfc 5424 timestamp + } else { + int nextSpace = msg.indexOf(' ', curPos); + if (nextSpace == -1) { + throw new IllegalArgumentException("bad timestamp format"); + } + ts = parseRfc5424Date(msg.substring(curPos, nextSpace)); + curPos = nextSpace + 1; + } + + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to parse message: " + msg, ex); + } + + headers.put("timestamp", String.valueOf(ts)); + + // parse out hostname + int nextSpace = msg.indexOf(' ', curPos); + if (nextSpace == -1) { + throw new IllegalArgumentException( + "bad syslog format (missing hostname)"); + } + // copy the host string to avoid holding the message string in memory + // if using a memory-based queue + String hostname = new String(msg.substring(curPos, nextSpace)); + headers.put("host", hostname); + + // EventBuilder will do a copy of its own, so no defensive copy of the body + String data = ""; + if (msgLen > nextSpace + 1) { + curPos = nextSpace + 1; + data = msg.substring(curPos); + } + + Event event = EventBuilder.withBody(data, charset, headers); + + return event; + } + + /** + * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for + * multiple messages that occur in the same second. + * @param msg + * @return Typical (for Java) milliseconds since UNIX epoch + */ + protected long parseRfc5424Date(String msg) { + + Long ts = null; + int curPos = 0; + + int msgLen = msg.length(); + Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, + "Bad format: Not a valid RFC5424 timestamp: %s", msg); + String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN); + + try { + ts = timestampCache.get(timestampPrefix); + } catch (ExecutionException ex) { + throw new IllegalArgumentException("bad timestamp format", ex); + } + + curPos += RFC5424_PREFIX_LEN; + + Preconditions.checkArgument(ts != null, "Parsing error: timestamp is null"); + + // look for the optional fractional seconds + if (msg.charAt(curPos) == '.') { + // figure out how many numeric digits + boolean foundEnd = false; + int endMillisPos = curPos + 1; + + if (msgLen <= endMillisPos) { + throw new IllegalArgumentException("bad timestamp format (no TZ)"); + } + + // FIXME: TODO: ensure we handle all bad formatting cases + while (!foundEnd) { + char curDigit = msg.charAt(endMillisPos); + if (curDigit >= '0' && curDigit <= '9') { + endMillisPos++; + } else { + foundEnd = true; + } + } + + // if they had a valid fractional second, append it rounded to millis + if (endMillisPos - (curPos + 1) > 0) { + float frac = Float.parseFloat(msg.substring(curPos, endMillisPos)); + long milliseconds = (long) (frac * 1000f); + ts += milliseconds; + } else { + throw new IllegalArgumentException( + "Bad format: Invalid timestamp (fractional portion): " + msg); + } + + curPos = endMillisPos; + } + + // look for timezone + char tzFirst = msg.charAt(curPos); + + // UTC + if (tzFirst == 'Z') { + // no-op + } else if (tzFirst == '+' || tzFirst == '-') { + + Preconditions.checkArgument(msgLen > curPos + 5, + "Bad format: Invalid timezone (%s)", msg); + + int polarity; + if (tzFirst == '+') { + polarity = +1; + } else { + polarity = -1; + } + + char[] h = new char[5]; + for (int i = 0; i < 5; i++) { + h[i] = msg.charAt(curPos + 1 + i); + } + + if (h[0] >= '0' && h[0] <= '9' + && h[1] >= '0' && h[1] <= '9' + && h[2] == ':' + && h[3] >= '0' && h[3] <= '9' + && h[4] >= '0' && h[4] <= '9') { + int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 3)); + int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 6)); + ts -= polarity * ((hourOffset * 60) + minOffset) * 60000; + } else { + throw new IllegalArgumentException( + "Bad format: Invalid timezone: " + msg); + } + + } + + + return ts; + } + + /** + * Parse the RFC3164 date format. This is trickier than it sounds because this + * format does not specify a year so we get weird edge cases at year + * boundaries. This implementation tries to "do what I mean". + * @param ts RFC3164-compatible timestamp to be parsed + * @return Typical (for Java) milliseconds since the UNIX epoch + */ + protected long parseRfc3164Time(String ts) { + DateTime now = DateTime.now(); + int year = now.getYear(); + + ts = TWO_SPACES.matcher(ts).replaceFirst(" "); + + DateTime date; + try { + date = rfc3164Format.parseDateTime(ts); + } catch (IllegalArgumentException e) { + logger.debug("rfc3164 date parse failed on ("+ts+"): invalid format", e); + return 0; + } + + // try to deal with boundary cases, i.e. new year's eve. + // rfc3164 dates are really dumb. + // NB: cannot handle replaying of old logs or going back to the future + if (date != null) { + DateTime fixed = date.withYear(year); + + // flume clock is ahead or there is some latency, and the year rolled + if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { + fixed = date.withYear(year - 1); + // flume clock is behind and the year rolled + } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { + fixed = date.withYear(year + 1); + } + date = fixed; + } + + if (date == null) { + return 0; + } + + return date.getMillis(); + } + + +} http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java index 8c87215..5a73c88 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java @@ -22,6 +22,11 @@ public final class SyslogSourceConfigurationConstants { public static final String CONFIG_PORT = "port"; + /** + * List of ports to listen to. + */ + public static final String CONFIG_PORTS = "ports"; + public static final String CONFIG_HOST = "host"; public static final String CONFIG_FORMAT_PREFIX = "format."; @@ -34,6 +39,32 @@ public final class SyslogSourceConfigurationConstants { public static final String CONFIG_DATEFORMAT = "dateFormat"; + /** + * Number of processors used to calculate number of threads to spawn. + */ + public static final String CONFIG_NUMPROCESSORS = "numProcessors"; + + /** + * Maximum allowable size of events. + */ + public static final String CONFIG_EVENTSIZE = "eventSize"; + + public static final String CONFIG_BATCHSIZE = "batchSize"; + + public static final String CONFIG_CHARSET = "charset.default"; + + public static final String DEFAULT_CHARSET = "UTF-8"; + + public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port."; + + public static final int DEFAULT_BATCHSIZE = 100; + + public static final String CONFIG_PORT_HEADER = "portHeader"; + + public static final String DEFAULT_PORT_HEADER = "port"; + + public static final String CONFIG_READBUF_SIZE = "readBufferBytes"; + public static final int DEFAULT_READBUF_SIZE = 1024; private SyslogSourceConfigurationConstants() { // Disable explicit creation of objects. http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java new file mode 100644 index 0000000..680e592 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.source.MultiportSyslogTCPSource.LineSplitter; +import org.apache.flume.source.MultiportSyslogTCPSource.MultiportSyslogHandler; +import org.apache.flume.source.MultiportSyslogTCPSource.ParsedBuffer; +import org.apache.flume.source.MultiportSyslogTCPSource.ThreadSafeDecoder; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory; +import org.apache.mina.transport.socket.nio.NioSession; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.mockito.Mockito.*; + +public class TestMultiportSyslogTCPSource { + + private static final Logger logger = + LoggerFactory.getLogger(TestMultiportSyslogTCPSource.class); + + private static final int BASE_TEST_SYSLOG_PORT = 14455; + private final DateTime time = new DateTime(); + private final String stamp1 = time.toString(); + private final String host1 = "localhost.localdomain"; + private final String data1 = "proc1 - some msg"; + + /** + * Helper function to generate a syslog message. + * @param counter + * @return + */ + private byte[] getEvent(int counter) { + // timestamp with 'Z' appended, translates to UTC + String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + " " + + String.valueOf(counter) + "\n"; + return msg1.getBytes(); + } + + /** + * Basic test to exercise multiple-port parsing. + */ + @Test + public void testMultiplePorts() throws IOException, ParseException { + MultiportSyslogTCPSource source = new MultiportSyslogTCPSource(); + Channel channel = new MemoryChannel(); + + Context channelContext = new Context(); + channelContext.put("capacity", String.valueOf(2000)); + channelContext.put("transactionCapacity", String.valueOf(2000)); + Configurables.configure(channel, channelContext); + + List<Channel> channels = Lists.newArrayList(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + Context context = new Context(); + StringBuilder ports = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + ports.append(String.valueOf(BASE_TEST_SYSLOG_PORT + i)).append(" "); + } + context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS, + ports.toString().trim()); + source.configure(context); + source.start(); + + Socket syslogSocket; + for (int i = 0; i < 1000 ; i++) { + syslogSocket = new Socket( + InetAddress.getLocalHost(), BASE_TEST_SYSLOG_PORT + i); + syslogSocket.getOutputStream().write(getEvent(i)); + syslogSocket.close(); + } + + List<Event> channelEvents = new ArrayList<Event>(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 1000; i++) { + Event e = channel.take(); + if (e == null) { + throw new NullPointerException("Event is null"); + } + channelEvents.add(e); + } + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + //Since events can arrive out of order, search for each event in the array + for (int i = 0; i < 1000 ; i++) { + Iterator<Event> iter = channelEvents.iterator(); + while (iter.hasNext()) { + Event e = iter.next(); + Map<String, String> headers = e.getHeaders(); + // rely on port to figure out which event it is + Integer port = null; + if (headers.containsKey( + SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)) { + port = Integer.parseInt(headers.get( + SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)); + } + iter.remove(); + + Assert.assertEquals("Timestamps must match", + String.valueOf(time.getMillis()), headers.get("timestamp")); + + String host2 = headers.get("host"); + Assert.assertEquals(host1, host2); + + if (port != null) { + int num = port - BASE_TEST_SYSLOG_PORT; + Assert.assertEquals(data1 + " " + String.valueOf(num), + new String(e.getBody())); + } + } + } + source.stop(); + } + + /** + * Test the reassembly of a single line across multiple packets. + */ + @Test + public void testFragmented() throws CharacterCodingException { + final int maxLen = 100; + + IoBuffer savedBuf = IoBuffer.allocate(maxLen); + + String origMsg = "<1>- - blah blam foo\n"; + IoBuffer buf1 = IoBuffer.wrap( + origMsg.substring(0, 11).getBytes(Charsets.UTF_8)); + IoBuffer buf2 = IoBuffer.wrap( + origMsg.substring(11, 16).getBytes(Charsets.UTF_8)); + IoBuffer buf3 = IoBuffer.wrap( + origMsg.substring(16, 21).getBytes(Charsets.UTF_8)); + + LineSplitter lineSplitter = new LineSplitter(maxLen); + ParsedBuffer parsedLine = new ParsedBuffer(); + + Assert.assertFalse("Incomplete line should not be parsed", + lineSplitter.parseLine(buf1, savedBuf, parsedLine)); + Assert.assertFalse("Incomplete line should not be parsed", + lineSplitter.parseLine(buf2, savedBuf, parsedLine)); + Assert.assertTrue("Completed line should be parsed", + lineSplitter.parseLine(buf3, savedBuf, parsedLine)); + + // the fragmented message should now be reconstructed + Assert.assertEquals(origMsg.trim(), + parsedLine.buffer.getString(Charsets.UTF_8.newDecoder())); + parsedLine.buffer.rewind(); + + MultiportSyslogTCPSource.MultiportSyslogHandler handler = + new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null, + null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER, + new ThreadSafeDecoder(Charsets.UTF_8), + new ConcurrentHashMap<Integer, ThreadSafeDecoder>()); + + Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder()); + String body = new String(event.getBody(), Charsets.UTF_8); + Assert.assertEquals("Event body incorrect", + origMsg.trim().substring(7), body); + } + + /** + * Test parser handling of different character sets. + */ + @Test + public void testCharsetParsing() throws FileNotFoundException, IOException { + String header = "<10>2012-08-11T01:01:01Z localhost "; + String enBody = "Yarf yarf yarf"; + String enMsg = header + enBody; + String frBody = "Comment " + "\u00EA" + "tes-vous?"; + String frMsg = header + frBody; + String esBody = "¿Cómo estás?"; + String esMsg = header + esBody; + + // defaults to UTF-8 + MultiportSyslogHandler handler = new MultiportSyslogHandler( + 1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()), + new SourceCounter("test"), "port", + new ThreadSafeDecoder(Charsets.UTF_8), + new ConcurrentHashMap<Integer, ThreadSafeDecoder>()); + + ParsedBuffer parsedBuf = new ParsedBuffer(); + parsedBuf.incomplete = false; + + // should be able to encode/decode any of these messages in UTF-8 or ISO + String[] bodies = { enBody, esBody, frBody }; + String[] msgs = { enMsg, esMsg, frMsg }; + Charset[] charsets = { Charsets.UTF_8, Charsets.ISO_8859_1 }; + for (Charset charset : charsets) { + for (int i = 0; i < msgs.length; i++) { + String msg = msgs[i]; + String body = bodies[i]; + parsedBuf.buffer = IoBuffer.wrap(msg.getBytes(charset)); + Event evt = handler.parseEvent(parsedBuf, charset.newDecoder()); + String result = new String(evt.getBody(), charset); + // this doesn't work with non-UTF-8 chars... not sure why... + Assert.assertEquals(charset + " parse error: " + msg, body, result); + Assert.assertNull( + evt.getHeaders().get(SyslogUtils.EVENT_STATUS)); + } + } + + // Construct an invalid UTF-8 sequence. + // The parser should still generate an Event, but mark it as INVALID. + byte[] badUtf8Seq = enMsg.getBytes(Charsets.ISO_8859_1); + int badMsgLen = badUtf8Seq.length; + badUtf8Seq[badMsgLen - 2] = (byte)0xFE; // valid ISO-8859-1, invalid UTF-8 + badUtf8Seq[badMsgLen - 1] = (byte)0xFF; // valid ISO-8859-1, invalid UTF-8 + parsedBuf.buffer = IoBuffer.wrap(badUtf8Seq); + Event evt = handler.parseEvent(parsedBuf, Charsets.UTF_8.newDecoder()); + Assert.assertEquals("event body: " + + new String(evt.getBody(), Charsets.ISO_8859_1) + + " and my default charset = " + Charset.defaultCharset() + + " with event = " + evt, + SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), + evt.getHeaders().get(SyslogUtils.EVENT_STATUS)); + Assert.assertArrayEquals("Raw message data should be kept in body of event", + badUtf8Seq, evt.getBody()); + + } + + // helper function + private static Event takeEvent(Channel channel) { + Transaction txn = channel.getTransaction(); + txn.begin(); + Event evt = channel.take(); + txn.commit(); + txn.close(); + return evt; + } + + /** + * Test that different charsets are parsed by different ports correctly. + */ + @Test + public void testPortCharsetHandling() throws UnknownHostException, Exception { + + /////////////////////////////////////////////////////// + // port setup + + InetAddress localAddr = InetAddress.getLocalHost(); + DefaultIoSessionDataStructureFactory dsFactory = + new DefaultIoSessionDataStructureFactory(); + + + // one faker on port 10001 + int port1 = 10001; + NioSession session1 = mock(NioSession.class); + session1.setAttributeMap(dsFactory.getAttributeMap(session1)); + SocketAddress sockAddr1 = new InetSocketAddress(localAddr, port1); + when(session1.getLocalAddress()).thenReturn(sockAddr1); + + // another faker on port 10002 + int port2 = 10002; + NioSession session2 = mock(NioSession.class); + session2.setAttributeMap(dsFactory.getAttributeMap(session2)); + SocketAddress sockAddr2 = new InetSocketAddress(localAddr, port2); + when(session2.getLocalAddress()).thenReturn(sockAddr2); + + // set up expected charsets per port + ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets = + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(); + portCharsets.put(port1, new ThreadSafeDecoder(Charsets.ISO_8859_1)); + portCharsets.put(port2, new ThreadSafeDecoder(Charsets.UTF_8)); + + /////////////////////////////////////////////////////// + // channel / source setup + + // set up channel to receive events + MemoryChannel chan = new MemoryChannel(); + chan.configure(new Context()); + chan.start(); + ReplicatingChannelSelector sel = new ReplicatingChannelSelector(); + sel.setChannels(Lists.<Channel>newArrayList(chan)); + ChannelProcessor chanProc = new ChannelProcessor(sel); + + // defaults to UTF-8 + MultiportSyslogHandler handler = new MultiportSyslogHandler( + 1000, 10, chanProc, new SourceCounter("test"), "port", + new ThreadSafeDecoder(Charsets.UTF_8), portCharsets); + + // initialize buffers + handler.sessionCreated(session1); + handler.sessionCreated(session2); + + /////////////////////////////////////////////////////// + // event setup + + // Create events of varying charsets. + String header = "<10>2012-08-17T02:14:00-07:00 192.168.1.110 "; + + // These chars encode under ISO-8859-1 as illegal bytes under UTF-8. + String dangerousChars = "þÿÃÃ"; + + /////////////////////////////////////////////////////// + // encode and send them through the message handler + String msg; + IoBuffer buf; + Event evt; + + // valid ISO-8859-1 on the right (ISO-8859-1) port + msg = header + dangerousChars + "\n"; + buf = IoBuffer.wrap(msg.getBytes(Charsets.ISO_8859_1)); + handler.messageReceived(session1, buf); + evt = takeEvent(chan); + Assert.assertNotNull("Event vanished!", evt); + Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS)); + + // valid ISO-8859-1 on the wrong (UTF-8) port + msg = header + dangerousChars + "\n"; + buf = IoBuffer.wrap(msg.getBytes(Charsets.ISO_8859_1)); + handler.messageReceived(session2, buf); + evt = takeEvent(chan); + Assert.assertNotNull("Event vanished!", evt); + Assert.assertEquals("Expected invalid event due to character encoding", + SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), + evt.getHeaders().get(SyslogUtils.EVENT_STATUS)); + + // valid UTF-8 on the right (UTF-8) port + msg = header + dangerousChars + "\n"; + buf = IoBuffer.wrap(msg.getBytes(Charsets.UTF_8)); + handler.messageReceived(session2, buf); + evt = takeEvent(chan); + Assert.assertNotNull("Event vanished!", evt); + Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS)); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java new file mode 100644 index 0000000..258c2f1 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.source; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import java.nio.charset.Charset; +import java.util.List; +import org.apache.flume.Event; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.junit.Assert; +import org.junit.Test; + +public class TestSyslogParser { + + @Test + public void testRfc5424DateParsing() { + final String[] examples = { + "1985-04-12T23:20:50.52Z", "1985-04-12T19:20:50.52-04:00", + "2003-10-11T22:14:15.003Z", "2003-08-24T05:14:15.000003-07:00", + "2012-04-13T11:11:11-08:00", "2012-04-13T08:08:08.0001+00:00" + }; + + SyslogParser parser = new SyslogParser(); + DateTimeFormatter jodaParser = ISODateTimeFormat.dateTimeParser(); + + for (String ex : examples) { + Assert.assertEquals( + "Problem parsing date string: " + ex, + jodaParser.parseMillis(ex), + parser.parseRfc5424Date(ex)); + } + } + + @Test + public void testMessageParsing() { + SyslogParser parser = new SyslogParser(); + Charset charset = Charsets.UTF_8; + List<String> messages = Lists.newArrayList(); + + // supported examples from RFC 3161 + messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " + + "lonvick on /dev/pts/8"); + messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!"); + messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " + + "It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # " + + "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " + + "Conveyer1=OK, Conveyer2=OK # %%"); + messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " + + "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!"); + + // supported examples from RFC 5424 + messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8"); + messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " + + "8710 - - %% It's time to make the do-nuts."); + + // non-standard (but common) messages (RFC3339 dates, no version digit) + messages.add("<13>2003-08-24T05:14:15Z localhost snarf?"); + messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!"); + + for (String msg : messages) { + Event event = parser.parseMessage(msg, charset); + Assert.assertNull("Failure to parse known-good syslog message", + event.getHeaders().get(SyslogUtils.EVENT_STATUS)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f3ac6d8..f34c808 100644 --- a/pom.xml +++ b/pom.xml @@ -876,6 +876,11 @@ limitations under the License. <version>1.4.1</version> </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-core</artifactId> + <version>2.0.4</version> + </dependency> <dependency> <groupId>org.hbase</groupId>
