Updated Branches: refs/heads/flume-1.5 db1abb0a8 -> 5de9a8af4
FLUME-2130. Handle larger payloads via SyslogUDPSource (Ashish Paliwal via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5de9a8af Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5de9a8af Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5de9a8af Branch: refs/heads/flume-1.5 Commit: 5de9a8af4d0f859bd88e65996e7543302fd9eb1f Parents: db1abb0 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Fri Feb 7 12:32:56 2014 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Fri Feb 7 12:33:53 2014 -0800 ---------------------------------------------------------------------- .../apache/flume/source/SyslogUDPSource.java | 19 ++++--- .../flume/source/TestSyslogUdpSource.java | 52 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5de9a8af/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 8fb251b..01b8905 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -35,13 +35,7 @@ import org.apache.flume.conf.Configurables; import org.apache.flume.source.SyslogUtils; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; import org.slf4j.Logger; @@ -61,8 +55,14 @@ public class SyslogUDPSource extends AbstractSource .getLogger(SyslogUDPSource.class); private CounterGroup counterGroup = new CounterGroup(); + + // Default Min size + public static final int DEFAULT_MIN_SIZE = 2048; + public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; + public class syslogHandler extends SimpleChannelHandler { - private SyslogUtils syslogUtils = new SyslogUtils(true); + private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, + SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, true); public void setFormater(Map<String, String> prop) { syslogUtils.addFormats(prop); @@ -98,6 +98,9 @@ public class SyslogUDPSource extends AbstractSource final syslogHandler handler = new syslogHandler(); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); + serverBootstrap.setOption("receiveBufferSizePredictorFactory", + new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, + DEFAULT_INITIAL_SIZE, maxsize)); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { http://git-wip-us.apache.org/repos/asf/flume/blob/5de9a8af/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index 36f6479..95ee48c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java @@ -25,6 +25,7 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.DatagramSocket; import com.google.common.base.Charsets; +import com.google.common.base.Strings; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -124,6 +125,49 @@ public class TestSyslogUdpSource { } @Test + public void testLargePayload() throws Exception { + init(true); + source.start(); + // Write some message to the syslog port + + byte[] largePayload = getPayload(1000).getBytes(); + + DatagramSocket syslogSocket; + DatagramPacket datagramPacket; + datagramPacket = new DatagramPacket(largePayload, + 1000, + InetAddress.getLocalHost(), source.getSourcePort()); + for (int i = 0; i < 10 ; i++) { + syslogSocket = new DatagramSocket(); + syslogSocket.send(datagramPacket); + syslogSocket.close(); + } + + List<Event> channelEvents = new ArrayList<Event>(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 10; i++) { + Event e = channel.take(); + Assert.assertNotNull(e); + channelEvents.add(e); + } + + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + + source.stop(); + for (Event e : channelEvents) { + Assert.assertNotNull(e); + Assert.assertArrayEquals(largePayload, e.getBody()); + } + } + + @Test public void testKeepFields() throws IOException { runKeepFieldsTest(true); } @@ -132,5 +176,13 @@ public class TestSyslogUdpSource { public void testRemoveFields() throws IOException { runKeepFieldsTest(false); } + + private String getPayload(int length) { + StringBuilder payload = new StringBuilder(length); + for (int n = 0; n < length; ++n) { + payload.append("x"); + } + return payload.toString(); + } }