> On Oct. 7, 2012, 4:30 a.m., Mike Percy wrote:
> > Sorry for taking so long for me to get back to this. I read through the
> > code again, and I figured out what was throwing me off the first time. The
> > way this is structured, there is a polling check-and-sleep mechanism built
> > into the SpoolingFileLineReader, causing it to block, as well as a
> > start/stop mechanism. This makes the SpoolingFileLineReader pretty
> > complicated and, in my opinion, a bit tricky to understand. I think it
> > would be great to refactor this code so that the looping mechanism was done
> > via ScheduledExecutorService.scheduleWithFixedDelay at the top level of the
> > Source and/or Client (actually, an Executor is already there in the
> > Source), allowing the SpoolingFileLineReader to focus more on reading files
> > and less on maintaining state. This should also remove the need for the
> > ReaderStoppedException.
> >
> > Another thing to consider is error conditions. What happens if
> > processEvents() throws a ChannelException? This can happen if the channel
> > is full. In that case, we should back off a bit, and then retry pushing the
> > same events on the next run. Along the same vein, we should not roll a file
> > until we are certain all events have successfully been committed to a
> > channel.
> >
> > To clarify what I'm suggesting, this is an example of how it could be
> > refactored:
> >
> > SpoolDirectorySource.start:
> > scheduledExecutorService.scheduleWithFixedDelay(new SpoolRunnable(new
> > SpoolingFileLineReader(...)), 0L, POLL_SLEEP_MS, TimeUnit.MILLISECONDS);
> > SpoolRunnable.run: while (true) { lines = reader.readLines(batchSize); if
> > (lines == null) return; for (line in lines) {
> > events.add(createEvent(line)); } try {
> > channelProcessor.processEvents(events); reader.commit(); } catch
> > (ChannelException ex) { /* ... */ } }
> > SpoolingFileLineReader.readLines: If no commit since last readLines() call,
> > then return the previous result from a saved buffer. Otherwise, read, save
> > to a buffer, and then return lines from an available file if possible. If
> > no files are available, return null.
> > SpoolingFileLineReader.commit: Clear the saved buffer. If the current file
> > is at EOF, then retire the file.
> >
> > Thanks for all the hard work. Let's take this to the finish line.
In general - I like this pattern much better than what is there now.
One question though - this assumes that a given SpoolingFileLineReader will
only be accessed from a single thread. If several threads are calling
readLines() and commit() it will mess with the semantics of commit(). In the
current model you can have multiple threads reading lines interchangeably (I
think - thought I haven't fully fleshed this out in my mind).
I think not being thread-safe is fine though, given the way that this hooks
into a source - where it will only be accessed by one thread.
- Patrick
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6377/#review12210
-----------------------------------------------------------
On Aug. 14, 2012, 10:02 p.m., Patrick Wendell wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6377/
> -----------------------------------------------------------
>
> (Updated Aug. 14, 2012, 10:02 p.m.)
>
>
> Review request for Flume.
>
>
> Description
> -------
>
> This patch adds a spooling directory based source. The idea is that a user
> can have a spool directory where files are deposited for ingestion into
> flume. Once ingested, the files are clearly renamed and the implementation
> guarantees at-least-once delivery semantics similar to those achieved within
> flume itself, even across failures and restarts of the JVM running the code.
>
> This helps fill the gap for people who want a way to get reliable delivery of
> events into flume, but don't want to directly write their application against
> the flume API. They can simply drop log files off in a spooldir and let flume
> ingest asynchronously (using some shell scripts or other automated process).
>
> Unlike the prior iteration, this patch implements a first-class source. It
> also extends the avro client to support spooling in a similar manner.
>
>
> This addresses bug FlUME-1425.
> https://issues.apache.org/jira/browse/FlUME-1425
>
>
> Diffs
> -----
>
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
> da804d7
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
> abbbf1c
> flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
> 4a5ecae
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
> PRE-CREATION
> flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
> PRE-CREATION
> flume-ng-doc/sphinx/FlumeUserGuide.rst 45dd7cc
>
> Diff: https://reviews.apache.org/r/6377/diff/
>
>
> Testing
> -------
>
> Extensive unit tests and I also built and played with this using a stub flume
> agent. If you look at the JIRA I have a configuration file for an agent that
> will print out Avro events to the command line - that's helpful when testing
> this.
>
>
> Thanks,
>
> Patrick Wendell
>
>