Repository: flume
Updated Branches:
  refs/heads/flume-1.7 a56282086 -> 7dbe6fef6


FLUME-2798. Malformed Syslog messages can lead to OutOfMemoryException

(Phil D'Amore via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7dbe6fef
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7dbe6fef
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7dbe6fef

Branch: refs/heads/flume-1.7
Commit: 7dbe6fef6226d3fc2d8ada711c583b29ede3cfbd
Parents: a562820
Author: Roshan Naik <[email protected]>
Authored: Fri Oct 2 16:07:12 2015 -0700
Committer: Roshan Naik <[email protected]>
Committed: Fri Oct 2 16:07:12 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/source/SyslogTcpSource.java    |   4 +
 .../apache/flume/source/SyslogUDPSource.java    |   4 +
 .../org/apache/flume/source/SyslogUtils.java    | 100 ++++++++++---------
 .../apache/flume/source/TestSyslogUtils.java    |  46 +++++++++
 4 files changed, 109 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index c117813..bd87151 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -93,6 +93,10 @@ implements EventDrivenSource, Configurable {
         } catch (ChannelException ex) {
           counterGroup.incrementAndGet("events.dropped");
           logger.error("Error writting to channel, event dropped", ex);
+        } catch (RuntimeException ex) {
+          counterGroup.incrementAndGet("events.dropped");
+          logger.error("Error parsing event from syslog stream, event 
dropped", ex);
+          return;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 378d484..47993dd 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -85,6 +85,10 @@ public class SyslogUDPSource extends AbstractSource
         counterGroup.incrementAndGet("events.dropped");
         logger.error("Error writting to channel", ex);
         return;
+      } catch (RuntimeException ex) {
+        counterGroup.incrementAndGet("events.dropped");
+        logger.error("Error parsing event from syslog stream, event dropped", 
ex);
+        return;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index 42e3f71..5a9f4c8 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -285,55 +285,58 @@ public class SyslogUtils {
 
   // create the event from syslog data
   Event buildEvent() {
-    byte[] body;
-    int pri = 0;
-    int sev = 0;
-    int facility = 0;
-
-    if(!isBadEvent){
-      pri = Integer.parseInt(prio.toString());
-      sev = pri % 8;
-      facility = pri / 8;
-      formatHeaders();
-    }
+    try {
+      byte[] body;
+      int pri = 0;
+      int sev = 0;
+      int facility = 0;
+
+      if(!isBadEvent){
+        pri = Integer.parseInt(prio.toString());
+        sev = pri % 8;
+        facility = pri / 8;
+        formatHeaders();
+      }
 
-    Map <String, String> headers = new HashMap<String, String>();
-    headers.put(SYSLOG_FACILITY, String.valueOf(facility));
-    headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
-    if ((priority != null) && (priority.length() > 0)) {
-      headers.put("priority", priority);
-    }
-    if ((version != null) && (version.length() > 0)) {
-      headers.put("version", version);
-    }
-    if ((timeStamp != null) && timeStamp.length() > 0) {
-      headers.put("timestamp", timeStamp);
-    }
-    if ((hostName != null) && (hostName.length() > 0)) {
-      headers.put("host", hostName);
-    }
-    if(isBadEvent){
-      logger.warn("Event created from Invalid Syslog data.");
-      headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus());
-    } else if(isIncompleteEvent){
-      logger.warn("Event size larger than specified event size: {}. You should 
" +
-          "consider increasing your event size.", maxSize);
-      headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
-    }
+      Map <String, String> headers = new HashMap<String, String>();
+      headers.put(SYSLOG_FACILITY, String.valueOf(facility));
+      headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
+      if ((priority != null) && (priority.length() > 0)) {
+        headers.put("priority", priority);
+      }
+      if ((version != null) && (version.length() > 0)) {
+        headers.put("version", version);
+      }
+      if ((timeStamp != null) && timeStamp.length() > 0) {
+        headers.put("timestamp", timeStamp);
+      }
+      if ((hostName != null) && (hostName.length() > 0)) {
+        headers.put("host", hostName);
+      }
+      if(isBadEvent){
+        logger.warn("Event created from Invalid Syslog data.");
+        headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus());
+      } else if(isIncompleteEvent){
+        logger.warn("Event size larger than specified event size: {}. You 
should " +
+            "consider increasing your event size.", maxSize);
+        headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
+      }
 
-    if (!keepAllFields(keepFields)) {
-      if ((msgBody != null) && (msgBody.length() > 0)) {
-        body = msgBody.getBytes();
+      if (!keepAllFields(keepFields)) {
+        if ((msgBody != null) && (msgBody.length() > 0)) {
+          body = msgBody.getBytes();
+        } else {
+          // Parse failed.
+          body = baos.toByteArray();
+        }
       } else {
-        // Parse failed.
         body = baos.toByteArray();
       }
-    } else {
-      body = baos.toByteArray();
+      // format the message
+      return EventBuilder.withBody(body, headers);
+    } finally {
+      reset();
     }
-    reset();
-    // format the message
-    return EventBuilder.withBody(body, headers);
   }
 
   // Apply each known pattern to message
@@ -441,11 +444,18 @@ public class SyslogUtils {
         case PRIO:
           baos.write(b);
           if (b == '>') {
+            if (prio.length() == 0) {
+              isBadEvent = true;
+            }
             m = Mode.DATA;
           } else {
             char ch = (char) b;
             prio.append(ch);
-            if (!Character.isDigit(ch)) {
+            // Priority is max 3 digits per both RFC 3164 and 5424
+            // With this check there is basically no danger of
+            // boas.size() exceeding this.maxSize before getting to the
+            // DATA state where this is actually checked
+            if (!Character.isDigit(ch) || prio.length() > 3) {
               isBadEvent = true;
               //If we hit a bad priority, just write as if everything is data.
               m = Mode.DATA;
@@ -460,7 +470,7 @@ public class SyslogUtils {
           } else {
             baos.write(b);
           }
-          if(baos.size() == this.maxSize && !doneReading){
+          if(baos.size() == this.maxSize && !doneReading) {
             isIncompleteEvent = true;
             e = buildEvent();
             doneReading = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/7dbe6fef/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 6da1733..be4598e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -264,6 +264,52 @@ public class TestSyslogUtils {
   }
 
   /**
+   * Test bad event format 3: Empty priority - <>
+   */
+
+  @Test
+  public void testExtractBadEvent3() {
+    String badData1 = "<> bad bad data\n";
+    SyslogUtils util = new SyslogUtils(false);
+    ChannelBuffer buff = ChannelBuffers.buffer(100);
+    buff.writeBytes(badData1.getBytes());
+    Event e = util.extractEvent(buff);
+    if(e == null){
+      throw new NullPointerException("Event is null");
+    }
+    Map<String, String> headers = e.getHeaders();
+    Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
+    Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
+    Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
+        headers.get(SyslogUtils.EVENT_STATUS));
+    Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
+
+  }
+
+  /**
+   * Test bad event format 4: Priority too long
+   */
+
+  @Test
+  public void testExtractBadEvent4() {
+    String badData1 = "<123123123123123123123123123123> bad bad data\n";
+    SyslogUtils util = new SyslogUtils(false);
+    ChannelBuffer buff = ChannelBuffers.buffer(100);
+    buff.writeBytes(badData1.getBytes());
+    Event e = util.extractEvent(buff);
+    if(e == null){
+      throw new NullPointerException("Event is null");
+    }
+    Map<String, String> headers = e.getHeaders();
+    Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
+    Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
+    Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
+        headers.get(SyslogUtils.EVENT_STATUS));
+    Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
+
+  }
+
+  /**
    * Good event
    */
   @Test

Reply via email to