Repository: flume
Updated Branches:
  refs/heads/trunk e6416a070 -> af63d38fa


FLUME-2704. Configurable poll delay for spooling directory source

(Somin Mithraa via Johny Rufus)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/af63d38f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/af63d38f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/af63d38f

Branch: refs/heads/trunk
Commit: af63d38fada97a06c542ad875ef31ea3e74d53cc
Parents: e6416a0
Author: Johny Rufus <[email protected]>
Authored: Sun Jan 17 10:25:41 2016 -0800
Committer: Johny Rufus <[email protected]>
Committed: Sun Jan 17 10:25:41 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/source/SpoolDirectorySource.java   | 8 ++++----
 .../source/SpoolDirectorySourceConfigurationConstants.java   | 4 ++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst                       | 1 +
 3 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/af63d38f/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 0b11fc9..3fe947d 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -45,9 +45,6 @@ Configurable, EventDrivenSource {
   private static final Logger logger = LoggerFactory
       .getLogger(SpoolDirectorySource.class);
 
-  // Delay used when polling for new files
-  private static final int POLL_DELAY_MS = 500;
-
   /* Config options */
   private String completedSuffix;
   private String spoolDirectory;
@@ -72,6 +69,7 @@ Configurable, EventDrivenSource {
   private boolean hitChannelException = false;
   private int maxBackoff;
   private ConsumeOrder consumeOrder;
+  private int pollDelay;
 
   @Override
   public synchronized void start() {
@@ -105,7 +103,7 @@ Configurable, EventDrivenSource {
 
     Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
     executor.scheduleWithFixedDelay(
-        runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
+        runner, 0, pollDelay, TimeUnit.MILLISECONDS);
 
     super.start();
     logger.debug("SpoolDirectorySource source started");
@@ -168,6 +166,8 @@ Configurable, EventDrivenSource {
     consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, 
         DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH));
 
+    pollDelay = context.getInteger(POLL_DELAY, DEFAULT_POLL_DELAY);
+
     // "Hack" to support backwards compatibility with previous generation of
     // spooling directory source, which did not support deserializers
     Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);

http://git-wip-us.apache.org/repos/asf/flume/blob/af63d38f/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 895433e..5053697 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -93,4 +93,8 @@ public class SpoolDirectorySourceConfigurationConstants {
   }
   public static final String CONSUME_ORDER = "consumeOrder";
   public static final ConsumeOrder DEFAULT_CONSUME_ORDER = 
ConsumeOrder.OLDEST;    
+  
+  /** Delay(in milliseconds) used when polling for new files. The default is 
500ms */
+  public static final String POLL_DELAY = "pollDelay";
+  public static final int DEFAULT_POLL_DELAY = 500;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/af63d38f/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 897a2ca..0f8461d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -999,6 +999,7 @@ consumeOrder          oldest          In which order files 
in the spooling direc
                                       directory will be scanned to pick the 
oldest/youngest file, which might be slow if there
                                       are a large number of files, while using 
``random`` may cause old files to be consumed
                                       very late if new files keep coming in 
the spooling directory.
+pollDelay             500             Delay (in milliseconds) used when 
polling for new files.
 maxBackoff            4000            The maximum time (in millis) to wait 
between consecutive attempts to write to the channel(s) if the channel is full. 
The source will start at a low backoff and increase it exponentially each time 
the channel throws a ChannelException, upto the value specified by this 
parameter.
 batchSize             100             Granularity at which to batch transfer 
to the channel
 inputCharset          UTF-8           Character set used by deserializers that 
treat the input file as text.

Reply via email to