Repository: flume Updated Branches: refs/heads/flume-1.7 d6b4053e7 -> a6b55f183
FLUME-2704. Configurable poll delay for spooling directory source (Somin Mithraa via Johny Rufus) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a6b55f18 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a6b55f18 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a6b55f18 Branch: refs/heads/flume-1.7 Commit: a6b55f18350cb458f8eb23b95c1cff0812960673 Parents: d6b4053 Author: Johny Rufus <[email protected]> Authored: Sun Jan 17 10:25:41 2016 -0800 Committer: Johny Rufus <[email protected]> Committed: Sun Jan 17 10:36:10 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/source/SpoolDirectorySource.java | 8 ++++---- .../source/SpoolDirectorySourceConfigurationConstants.java | 4 ++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + 3 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a6b55f18/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 0b11fc9..3fe947d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -45,9 +45,6 @@ Configurable, EventDrivenSource { private static final Logger logger = LoggerFactory .getLogger(SpoolDirectorySource.class); - // Delay used when polling for new files - private static final int POLL_DELAY_MS = 500; - /* Config options */ private String completedSuffix; private String spoolDirectory; @@ -72,6 +69,7 @@ Configurable, EventDrivenSource { private boolean hitChannelException = false; private int maxBackoff; private ConsumeOrder consumeOrder; + private int pollDelay; @Override public synchronized void start() { @@ -105,7 +103,7 @@ Configurable, EventDrivenSource { Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay( - runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS); + runner, 0, pollDelay, TimeUnit.MILLISECONDS); super.start(); logger.debug("SpoolDirectorySource source started"); @@ -168,6 +166,8 @@ Configurable, EventDrivenSource { consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH)); + pollDelay = context.getInteger(POLL_DELAY, DEFAULT_POLL_DELAY); + // "Hack" to support backwards compatibility with previous generation of // spooling directory source, which did not support deserializers Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH); http://git-wip-us.apache.org/repos/asf/flume/blob/a6b55f18/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 895433e..5053697 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -93,4 +93,8 @@ public class SpoolDirectorySourceConfigurationConstants { } public static final String CONSUME_ORDER = "consumeOrder"; public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST; + + /** Delay(in milliseconds) used when polling for new files. The default is 500ms */ + public static final String POLL_DELAY = "pollDelay"; + public static final int DEFAULT_POLL_DELAY = 500; } http://git-wip-us.apache.org/repos/asf/flume/blob/a6b55f18/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 897a2ca..0f8461d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -999,6 +999,7 @@ consumeOrder oldest In which order files in the spooling direc directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using ``random`` may cause old files to be consumed very late if new files keep coming in the spooling directory. +pollDelay 500 Delay (in milliseconds) used when polling for new files. maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. batchSize 100 Granularity at which to batch transfer to the channel inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
