Updated Branches: refs/heads/flume-1.4 d7b474c37 -> 05e8ebc35
FLUME-1800: Docs for spooling source durability changes (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/05e8ebc3 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/05e8ebc3 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/05e8ebc3 Branch: refs/heads/flume-1.4 Commit: 05e8ebc352b0f8f96747bd1046ac22caf09e1890 Parents: d7b474c Author: Brock Noland <[email protected]> Authored: Fri Jan 25 15:09:12 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Jan 25 15:09:24 2013 -0600 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 20 +++- .../apache/flume/source/SpoolDirectorySource.java | 5 +- ...SpoolDirectorySourceConfigurationConstants.java | 13 ++- .../avro/TestReliableSpoolingFileEventReader.java | 2 +- flume-ng-doc/sphinx/FlumeUserGuide.rst | 98 ++++++++++----- 5 files changed, 98 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index b19d0ea..28df24c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.charset.Charset; import java.util.*; import java.util.regex.Pattern; @@ -84,6 +85,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private final boolean annotateFileName; private final String fileNameHeader; private final String deletePolicy; + private final Charset inputCharset; private Optional<FileInfo> currentFile = Optional.absent(); /** Always contains the last file from which lines have been read. **/ @@ -97,7 +99,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { String completedSuffix, String ignorePattern, String trackerDirPath, boolean annotateFileName, String fileNameHeader, String deserializerType, Context deserializerContext, - String deletePolicy) throws IOException { + String deletePolicy, String inputCharset) throws IOException { // Sanity checks Preconditions.checkNotNull(spoolDirectory); @@ -107,6 +109,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { Preconditions.checkNotNull(deserializerType); Preconditions.checkNotNull(deserializerContext); Preconditions.checkNotNull(deletePolicy); + Preconditions.checkNotNull(inputCharset); // validate delete policy if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) && @@ -149,6 +152,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.fileNameHeader = fileNameHeader; this.ignorePattern = Pattern.compile(ignorePattern); this.deletePolicy = deletePolicy; + this.inputCharset = Charset.forName(inputCharset); File trackerDirectory = new File(trackerDirPath); @@ -422,7 +426,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { tracker.getTarget(), nextPath); ResettableInputStream in = - new ResettableFileInputStream(nextFile, tracker); + new ResettableFileInputStream(nextFile, tracker, + ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset); EventDeserializer deserializer = EventDeserializerFactory.getInstance (deserializerType, deserializerContext, in); @@ -482,7 +487,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private String ignorePattern = SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT; private String trackerDirPath = - SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR; + SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; private Boolean annotateFileName = SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER; private String fileNameHeader = @@ -492,6 +497,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private Context deserializerContext = new Context(); private String deletePolicy = SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY; + private String inputCharset = + SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET; public Builder spoolDirectory(File directory) { this.spoolDirectory = directory; @@ -538,10 +545,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return this; } + public Builder inputCharset(String inputCharset) { + this.inputCharset = inputCharset; + return this; + } + public ReliableSpoolingFileEventReader build() throws IOException { return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader, - deserializerType, deserializerContext, deletePolicy); + deserializerType, deserializerContext, deletePolicy, inputCharset); } } http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 552bd48..698b906 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -56,6 +56,7 @@ Configurable, EventDrivenSource { private String deserializerType; private Context deserializerContext; private String deletePolicy; + private String inputCharset; private CounterGroup counterGroup; ReliableSpoolingFileEventReader reader; @@ -81,6 +82,7 @@ Configurable, EventDrivenSource { .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) + .inputCharset(inputCharset) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", @@ -115,9 +117,10 @@ Configurable, EventDrivenSource { DEFAULT_FILENAME_HEADER_KEY); batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); + inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET); ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT); - trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR); + trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR); deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER); deserializerContext = new Context(context.getSubProperties(DESERIALIZER + http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index afc7288..f3cc703 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -35,14 +35,18 @@ public class SpoolDirectorySourceConfigurationConstants { /** What size to batch with before sending to ChannelProcessor. */ public static final String BATCH_SIZE = "batchSize"; - public static final int DEFAULT_BATCH_SIZE = 10; + public static final int DEFAULT_BATCH_SIZE = 100; /** Maximum number of lines to buffer between commits. */ + @Deprecated public static final String BUFFER_MAX_LINES = "bufferMaxLines"; + @Deprecated public static final int DEFAULT_BUFFER_MAX_LINES = 100; /** Maximum length of line (in characters) in buffer between commits. */ + @Deprecated public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength"; + @Deprecated public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000; /** Pattern of files to ignore */ @@ -50,8 +54,8 @@ public class SpoolDirectorySourceConfigurationConstants { public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect /** Directory to store metadata about files being processed */ - public static final String META_DIR = "metaDir"; - public static final String DEFAULT_META_DIR = ".flumespool"; + public static final String TRACKER_DIR = "trackerDir"; + public static final String DEFAULT_TRACKER_DIR = ".flumespool"; /** Deserializer to use to parse the file data into Flume Events */ public static final String DESERIALIZER = "deserializer"; @@ -59,4 +63,7 @@ public class SpoolDirectorySourceConfigurationConstants { public static final String DELETE_POLICY = "deletePolicy"; public static final String DEFAULT_DELETE_POLICY = "never"; + + public static final String INPUT_CHARSET = "inputCharset"; + public static final String DEFAULT_INPUT_CHARSET = "UTF-8"; } http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index a29606e..31ecf8e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -119,7 +119,7 @@ public class TestReliableSpoolingFileEventReader { @Test public void testRepeatedCallsWithCommitOnSuccess() throws IOException { String trackerDirPath = - SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR; + SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; File trackerDir = new File(WORK_DIR, trackerDirPath); ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index b2c58de..452c634 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -773,54 +773,90 @@ Example for agent named a1: Spooling Directory Source ~~~~~~~~~~~~~~~~~~~~~~~~~ -This source lets you ingest data by dropping files in a spooling directory on -disk. **Unlike other asynchronous sources, this source -avoids data loss even if Flume is restarted or fails.** -Flume will watch the directory for new files and read then ingest them -as they appear. After a given file has been fully read into the channel, -it is renamed to indicate completion. This allows a cleaner process to remove -completed files periodically. Note, however, -that events may be duplicated if failures occur, consistent with the semantics -offered by other Flume components. The channel optionally inserts the full path of -the origin file into a header field of each event. This source buffers file data -in memory during reads; be sure to set the `bufferMaxLineLength` option to a number -greater than the longest line you expect to see in your input data. - -.. warning:: This channel expects that only immutable, uniquely named files - are dropped in the spooling directory. If duplicate names are - used, or files are modified while being read, the source will - fail with an error message. For some use cases this may require - adding unique identifiers (such as a timestamp) to log file names - when they are copied into the spooling directory. +This source lets you ingest data by placing files to be ingested into a +"spooling" directory on disk. +This source will watch the specified directory for new files, and will parse +events out of new files as they appear. +The event parsing logic is pluggable. +After a given file has been fully read +into the channel, it is renamed to indicate completion (or optionally deleted). + +Unlike the Exec source, this source is reliable and will not miss data, even if +Flume is restarted or killed. In exchange for this reliability, only immutable, +uniquely-named files must be dropped into the spooling directory. Flume tries +to detect these problem conditions and will fail loudly if they are violated: + +#. If a file is written to after being placed into the spooling directory, + Flume will print an error to its log file and stop processing. +#. If a file name is reused at a later time, Flume will print an error to its + log file and stop processing. + +To avoid the above issues, it may be useful to add a unique identifier +(such as a timestamp) to log file names when they are moved into the spooling +directory. + +Despite the reliability guarantees of this source, there are still +cases in which events may be duplicated if certain downstream failures occur. +This is consistent with the guarantees offered by other Flume components. ==================== ============== ========================================================== Property Name Default Description ==================== ============== ========================================================== **channels** -- -**type** -- The component type name, needs to be ``spooldir`` -**spoolDir** -- The directory where log files will be spooled +**type** -- The component type name, needs to be ``spooldir``. +**spoolDir** -- The directory from which to read files from. fileSuffix .COMPLETED Suffix to append to completely ingested files +deletePolicy never When to delete completed files: ``never`` or ``immediate`` fileHeader false Whether to add a header storing the filename fileHeaderKey file Header key to use when appending filename to header -batchSize 10 Granularity at which to batch transfer to the channel -bufferMaxLines 100 Maximum number of lines the commit buffer can hold -bufferMaxLineLength 5000 Maximum length of a line in the commit buffer +ignorePattern ^$ Regular expression specifying which files to ignore (skip) +trackerDir .flumespool Directory to store metadata related to processing of files. + If this path is not an absolute path, then it is interpreted as relative to the spoolDir. +batchSize 100 Granularity at which to batch transfer to the channel +inputCharset UTF-8 Character set used by deserializers that treat the input file as text. +deserializer ``LINE`` Specify the deserializer used to parse the file into events. + Defaults to parsing each line as an event. The class specified must implement + ``EventDeserializer.Builder``. +deserializer.* Varies per event deserializer. +bufferMaxLines -- (Obselete) This option is now ignored. +bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. selector.type replicating replicating or multiplexing selector.* Depends on the selector.type value interceptors -- Space separated list of interceptors interceptors.* ==================== ============== ========================================================== -Example for agent named a1: +Example for an agent named agent-1: .. code-block:: properties - a1.sources = r1 - a1.channels = c1 - a1.sources.r1.type = spooldir - a1.sources.r1.spoolDir = /var/log/apache/flumeSpool - a1.sources.r1.fileHeader = true - a1.sources.r1.channels = c1 + agent-1.channels = ch-1 + agent-1.sources = src-1 + + agent-1.sources.src-1.type = spooldir + agent-1.sources.src-1.channels = ch-1 + agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool + agent-1.sources.src-1.fileHeader = true + +Event Deserializers +''''''''''''''''''' + +The following event deserializers ship with Flume. + +LINE +^^^^ + +This deserializer generates one event per line of text input. + +============================== ============== ========================================================== +Property Name Default Description +============================== ============== ========================================================== +deserializer.maxLineLength 2048 Maximum number of characters to include in a single event. + If a line exceeds this length, it is truncated, and the + remaining characters on the line will appear in a + subsequent event. +deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel. +============================== ============== ========================================================== NetCat Source ~~~~~~~~~~~~~
