Repository: flume Updated Branches: refs/heads/flume-1.7 a56282086 -> 7dbe6fef6
FLUME-2798. Malformed Syslog messages can lead to OutOfMemoryException (Phil D'Amore via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7dbe6fef Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7dbe6fef Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7dbe6fef Branch: refs/heads/flume-1.7 Commit: 7dbe6fef6226d3fc2d8ada711c583b29ede3cfbd Parents: a562820 Author: Roshan Naik <[email protected]> Authored: Fri Oct 2 16:07:12 2015 -0700 Committer: Roshan Naik <[email protected]> Committed: Fri Oct 2 16:07:12 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/source/SyslogTcpSource.java | 4 + .../apache/flume/source/SyslogUDPSource.java | 4 + .../org/apache/flume/source/SyslogUtils.java | 100 ++++++++++--------- .../apache/flume/source/TestSyslogUtils.java | 46 +++++++++ 4 files changed, 109 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index c117813..bd87151 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java @@ -93,6 +93,10 @@ implements EventDrivenSource, Configurable { } catch (ChannelException ex) { counterGroup.incrementAndGet("events.dropped"); logger.error("Error writting to channel, event dropped", ex); + } catch (RuntimeException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error parsing event from syslog stream, event dropped", ex); + return; } } http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 378d484..47993dd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -85,6 +85,10 @@ public class SyslogUDPSource extends AbstractSource counterGroup.incrementAndGet("events.dropped"); logger.error("Error writting to channel", ex); return; + } catch (RuntimeException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error parsing event from syslog stream, event dropped", ex); + return; } } } http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 42e3f71..5a9f4c8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -285,55 +285,58 @@ public class SyslogUtils { // create the event from syslog data Event buildEvent() { - byte[] body; - int pri = 0; - int sev = 0; - int facility = 0; - - if(!isBadEvent){ - pri = Integer.parseInt(prio.toString()); - sev = pri % 8; - facility = pri / 8; - formatHeaders(); - } + try { + byte[] body; + int pri = 0; + int sev = 0; + int facility = 0; + + if(!isBadEvent){ + pri = Integer.parseInt(prio.toString()); + sev = pri % 8; + facility = pri / 8; + formatHeaders(); + } - Map <String, String> headers = new HashMap<String, String>(); - headers.put(SYSLOG_FACILITY, String.valueOf(facility)); - headers.put(SYSLOG_SEVERITY, String.valueOf(sev)); - if ((priority != null) && (priority.length() > 0)) { - headers.put("priority", priority); - } - if ((version != null) && (version.length() > 0)) { - headers.put("version", version); - } - if ((timeStamp != null) && timeStamp.length() > 0) { - headers.put("timestamp", timeStamp); - } - if ((hostName != null) && (hostName.length() > 0)) { - headers.put("host", hostName); - } - if(isBadEvent){ - logger.warn("Event created from Invalid Syslog data."); - headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); - } else if(isIncompleteEvent){ - logger.warn("Event size larger than specified event size: {}. You should " + - "consider increasing your event size.", maxSize); - headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); - } + Map <String, String> headers = new HashMap<String, String>(); + headers.put(SYSLOG_FACILITY, String.valueOf(facility)); + headers.put(SYSLOG_SEVERITY, String.valueOf(sev)); + if ((priority != null) && (priority.length() > 0)) { + headers.put("priority", priority); + } + if ((version != null) && (version.length() > 0)) { + headers.put("version", version); + } + if ((timeStamp != null) && timeStamp.length() > 0) { + headers.put("timestamp", timeStamp); + } + if ((hostName != null) && (hostName.length() > 0)) { + headers.put("host", hostName); + } + if(isBadEvent){ + logger.warn("Event created from Invalid Syslog data."); + headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); + } else if(isIncompleteEvent){ + logger.warn("Event size larger than specified event size: {}. You should " + + "consider increasing your event size.", maxSize); + headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); + } - if (!keepAllFields(keepFields)) { - if ((msgBody != null) && (msgBody.length() > 0)) { - body = msgBody.getBytes(); + if (!keepAllFields(keepFields)) { + if ((msgBody != null) && (msgBody.length() > 0)) { + body = msgBody.getBytes(); + } else { + // Parse failed. + body = baos.toByteArray(); + } } else { - // Parse failed. body = baos.toByteArray(); } - } else { - body = baos.toByteArray(); + // format the message + return EventBuilder.withBody(body, headers); + } finally { + reset(); } - reset(); - // format the message - return EventBuilder.withBody(body, headers); } // Apply each known pattern to message @@ -441,11 +444,18 @@ public class SyslogUtils { case PRIO: baos.write(b); if (b == '>') { + if (prio.length() == 0) { + isBadEvent = true; + } m = Mode.DATA; } else { char ch = (char) b; prio.append(ch); - if (!Character.isDigit(ch)) { + // Priority is max 3 digits per both RFC 3164 and 5424 + // With this check there is basically no danger of + // boas.size() exceeding this.maxSize before getting to the + // DATA state where this is actually checked + if (!Character.isDigit(ch) || prio.length() > 3) { isBadEvent = true; //If we hit a bad priority, just write as if everything is data. m = Mode.DATA; @@ -460,7 +470,7 @@ public class SyslogUtils { } else { baos.write(b); } - if(baos.size() == this.maxSize && !doneReading){ + if(baos.size() == this.maxSize && !doneReading) { isIncompleteEvent = true; e = buildEvent(); doneReading = true; http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 6da1733..be4598e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -264,6 +264,52 @@ public class TestSyslogUtils { } /** + * Test bad event format 3: Empty priority - <> + */ + + @Test + public void testExtractBadEvent3() { + String badData1 = "<> bad bad data\n"; + SyslogUtils util = new SyslogUtils(false); + ChannelBuffer buff = ChannelBuffers.buffer(100); + buff.writeBytes(badData1.getBytes()); + Event e = util.extractEvent(buff); + if(e == null){ + throw new NullPointerException("Event is null"); + } + Map<String, String> headers = e.getHeaders(); + Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); + Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); + Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), + headers.get(SyslogUtils.EVENT_STATUS)); + Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); + + } + + /** + * Test bad event format 4: Priority too long + */ + + @Test + public void testExtractBadEvent4() { + String badData1 = "<123123123123123123123123123123> bad bad data\n"; + SyslogUtils util = new SyslogUtils(false); + ChannelBuffer buff = ChannelBuffers.buffer(100); + buff.writeBytes(badData1.getBytes()); + Event e = util.extractEvent(buff); + if(e == null){ + throw new NullPointerException("Event is null"); + } + Map<String, String> headers = e.getHeaders(); + Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); + Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); + Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), + headers.get(SyslogUtils.EVENT_STATUS)); + Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); + + } + + /** * Good event */ @Test
