Repository: flume Updated Branches: refs/heads/trunk 9940dcbfe -> f15f20785
FLUME-2404. Make ScribeSource read buffer and max frame size configurable Scribe default Thrift service maxReadBufferBytes and frame size varies across Thrift versions. In some cases, these values are set to INT_MAX, in other cases this is set to 16MB. To avoid OOM in certain cases and incompatibilities in other cases, set the default to 16MB and also make the parameters configurable. (chenshangan and Marimuthu Ponnambalam via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f15f2078 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f15f2078 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f15f2078 Branch: refs/heads/trunk Commit: f15f20785262ac3cb3e35c2a12e669b7a836d35f Parents: 9940dcb Author: Mike Percy <[email protected]> Authored: Fri Jul 4 15:41:43 2014 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jul 4 15:41:43 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 15 ++++++++------- .../org/apache/flume/source/scribe/ScribeSource.java | 14 +++++++++++--- 2 files changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index f0dd8e8..1e98725 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1484,15 +1484,16 @@ Flume should use ScribeSource based on Thrift with compatible transfering protoc For deployment of Scribe please follow the guide from Facebook. Required properties are in **bold**. -============== =========== ============================================== -Property Name Default Description -============== =========== ============================================== -**type** -- The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource`` -port 1499 Port that Scribe should be connected -workerThreads 5 Handing threads number in Thrift +==================== =========== ============================================== +Property Name Default Description +==================== =========== ============================================== +**type** -- The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource`` +port 1499 Port that Scribe should be connected +maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size +workerThreads 5 Handing threads number in Thrift selector.type selector.* -============== =========== ============================================== +==================== =========== ============================================== Example for agent named a1: http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java index f9a14c1..1d7da09 100644 --- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java +++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java @@ -59,17 +59,24 @@ public class ScribeSource extends AbstractSource implements public static final String SCRIBE_CATEGORY = "category"; + private static final int DEFAULT_PORT = 1499; private static final int DEFAULT_WORKERS = 5; + private static final int DEFAULT_MAX_READ_BUFFER_BYTES = 16384000; private TServer server; - private int port = 1499; + private int port; private int workers; + private int maxReadBufferBytes; private SourceCounter sourceCounter; @Override public void configure(Context context) { - port = context.getInteger("port", port); + port = context.getInteger("port", DEFAULT_PORT); + maxReadBufferBytes = context.getInteger("maxReadBufferBytes", DEFAULT_MAX_READ_BUFFER_BYTES); + if(maxReadBufferBytes <= 0){ + maxReadBufferBytes = DEFAULT_MAX_READ_BUFFER_BYTES; + } workers = context.getInteger("workerThreads", DEFAULT_WORKERS); if (workers <= 0) { @@ -91,8 +98,9 @@ public class ScribeSource extends AbstractSource implements args.workerThreads(workers); args.processor(processor); - args.transportFactory(new TFramedTransport.Factory()); + args.transportFactory(new TFramedTransport.Factory(maxReadBufferBytes)); args.protocolFactory(new TBinaryProtocol.Factory(false, false)); + args.maxReadBufferBytes = maxReadBufferBytes; server = new THsHaServer(args);
