Updated Branches: refs/heads/flume-1.5 e278a3f1f -> 8742d5917
FLUME-1666. Syslog source strips timestamp and hostname from log message body (Jeff Lord via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8742d591 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8742d591 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8742d591 Branch: refs/heads/flume-1.5 Commit: 8742d591770e2bf1505c9228e36a26479350b8f8 Parents: e278a3f Author: Mike Percy <[email protected]> Authored: Wed Oct 9 18:49:31 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Oct 9 18:50:33 2013 -0700 ---------------------------------------------------------------------- .../SyslogSourceConfigurationConstants.java | 3 +++ .../apache/flume/source/SyslogTcpSource.java | 20 ++++++++++++++++++++ .../org/apache/flume/source/SyslogUtils.java | 19 +++++++++++++++---- .../flume/source/TestSyslogUdpSource.java | 4 ++++ .../apache/flume/source/TestSyslogUtils.java | 5 +++-- flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 2 +- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 ++ 7 files changed, 48 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java index 5a73c88..985949c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java @@ -66,6 +66,9 @@ public final class SyslogSourceConfigurationConstants { public static final String CONFIG_READBUF_SIZE = "readBufferBytes"; public static final int DEFAULT_READBUF_SIZE = 1024; + public static final String CONFIG_KEEP_FIELDS = "keepFields"; + public static final boolean DEFAULT_KEEP_FIELDS = false; + private SyslogSourceConfigurationConstants() { // Disable explicit creation of objects. } http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index db9e0fd..7a12d27 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java @@ -19,10 +19,12 @@ package org.apache.flume.source; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.CounterGroup; @@ -56,6 +58,7 @@ implements EventDrivenSource, Configurable { private Integer eventSize; private Map<String, String> formaterProp; private CounterGroup counterGroup = new CounterGroup(); + private Boolean keepFields; public class syslogTcpHandler extends SimpleChannelHandler { @@ -65,6 +68,10 @@ implements EventDrivenSource, Configurable { syslogUtils.setEventSize(eventSize); } + public void setKeepFields(boolean removeFields){ + syslogUtils.setKeepFields(removeFields); + } + public void setFormater(Map<String, String> prop) { syslogUtils.addFormats(prop); } @@ -103,6 +110,7 @@ implements EventDrivenSource, Configurable { syslogTcpHandler handler = new syslogTcpHandler(); handler.setEventSize(eventSize); handler.setFormater(formaterProp); + handler.setKeepFields(keepFields); return Channels.pipeline(handler); } }); @@ -146,6 +154,18 @@ implements EventDrivenSource, Configurable { eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE); formaterProp = context.getSubProperties( SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); + keepFields = context.getBoolean + (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false); + } + + @VisibleForTesting + public int getSourcePort() { + SocketAddress localAddress = nettyChannel.getLocalAddress(); + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) localAddress; + return addr.getPort(); + } + return 0; } } http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index c2a29a1..f2ea932 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -20,6 +20,8 @@ package org.apache.flume.source; import org.apache.flume.Event; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.event.EventBuilder; import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; @@ -37,6 +39,8 @@ import java.util.regex.MatchResult; import java.util.regex.Matcher; import java.util.regex.Pattern; [email protected] [email protected] public class SyslogUtils { final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ"; final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S"; @@ -79,6 +83,7 @@ public class SyslogUtils { private boolean isBadEvent; private boolean isIncompleteEvent; private Integer maxSize; + private boolean keepFields; private class SyslogFormatter { public Pattern regexPattern; @@ -98,15 +103,16 @@ public class SyslogUtils { } public SyslogUtils(boolean isUdp) { - this(DEFAULT_SIZE, isUdp); + this(DEFAULT_SIZE, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, isUdp); } - public SyslogUtils(Integer eventSize, boolean isUdp){ + public SyslogUtils(Integer eventSize, boolean keepFields, boolean isUdp) { this.isUdp = isUdp; isBadEvent = false; isIncompleteEvent = false; maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize; baos = new ByteArrayOutputStream(eventSize); + this.keepFields = keepFields; initHeaderFormats(); } @@ -219,7 +225,7 @@ public class SyslogUtils { headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); } - if ((msgBody != null) && (msgBody.length() > 0)) { + if ((msgBody != null) && (msgBody.length() > 0) && !keepFields) { body = msgBody.getBytes(); } else { body = baos.toByteArray(); @@ -380,4 +386,9 @@ public class SyslogUtils { this.maxSize = eventSize; } -} + public void setKeepFields(Boolean keepFields) { + this.keepFields= keepFields; + } + } + + http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index 2d7a429..eae26ed 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java @@ -20,6 +20,8 @@ package org.apache.flume.source; import java.util.ArrayList; import java.util.List; + +import com.google.common.base.Charsets; import org.apache.log4j.Logger; import org.apache.log4j.net.SyslogAppender; @@ -91,6 +93,8 @@ public class TestSyslogUdpSource { source.stop(); logger.removeAppender(appender); + String str = new String(e.getBody(), Charsets.UTF_8); + logger.info(str); Assert.assertNotNull(e); Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8), e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY)); http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 7208464..898096b 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -162,7 +162,8 @@ public class TestSyslogUtils { format1, host1, data1); } - public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException { + public void checkHeader(String msg1, String stamp1, String format1, + String host1, String data1) throws ParseException { SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(200); @@ -397,7 +398,7 @@ public class TestSyslogUtils { @Test public void testExtractBadEventLarge() { String badData1 = "<10> bad bad data bad bad\n"; - SyslogUtils util = new SyslogUtils(5, false); + SyslogUtils util = new SyslogUtils(5, true, false); ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index 2be9c68..ee7b89b 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -166,7 +166,7 @@ RPC clients - Avro and Thrift As of Flume 1.4.0, Avro is the default RPC protocol. The ``NettyAvroRpcClient`` and ``ThriftRpcClient`` implement the ``RpcClient`` interface. The client needs to create this object with the host and port of -the target Flume agent, and canthen use the ``RpcClient`` to send data into +the target Flume agent, and can then use the ``RpcClient`` to send data into the agent. The following example shows how to use the Flume Client SDK API within a user's data-generating application: http://git-wip-us.apache.org/repos/asf/flume/blob/8742d591/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 4892dfc..98859ce 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1170,6 +1170,8 @@ Property Name Default Description **host** -- Host name or IP address to bind to **port** -- Port # to bind to eventSize 2500 Maximum size of a single event line, in bytes +keepFields false Setting this to true will preserve the + Timestamp and Hostname in the body of the event. selector.type replicating or multiplexing selector.* replicating Depends on the selector.type value interceptors -- Space-separated list of interceptors
