[ 
https://issues.apache.org/jira/browse/FLUME-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13949214#comment-13949214
 ] 

Muhammad Ehsan ul Haque commented on FLUME-2318:
------------------------------------------------

I re-evaluated and narrow down the exact reason and cause for the bug.
To be exact the spooling directory is unable to handle empty file, "If empty 
file happens to be the last consumable file in the directory.", This is due to 
the fact that the _*committed*_ flag in 
_*org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java*_ is not 
set properly for empty files.

To test this out here is a test case that fails, if added in 
"org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java" test, 
the test is a slight modification of an already present test method 
_*testRepeatedCallsWithCommitOnSuccess*_

{code}
@Test
  public void testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast() 
throws IOException {
    Files.touch(new File(WORK_DIR, "newEmptyFile"));

    String trackerDirPath =
        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
    File trackerDir = new File(WORK_DIR, trackerDirPath);

    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();

    final int expectedLines = 0 + 1 + 2 + 3 + 1;
    int seenLines = 0;
    for (int i = 0; i < 10; i++) {
      List<Event> events = reader.readEvents(10);
      int numEvents = events.size();
      if (numEvents > 0) {
        seenLines += numEvents;
        reader.commit();

        // ensure that there are files in the trackerDir
        File[] files = trackerDir.listFiles();
        Assert.assertNotNull(files);
        Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir
            .getAbsolutePath(), files.length > 0);
      }
    }

    Assert.assertEquals(expectedLines, seenLines);
  }
{code}

This is the exception thrown
{code}
java.lang.IllegalStateException: File should not roll when commit is 
outstanding.
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:224)
        at 
org.apache.flume.client.avro.TestReliableSpoolingFileEventReader.testRepeatedCallsWithCommitOnSuccess(TestReliableSpoolingFileEventReader.java:152
{code}

h3. Possible Fixes
* Fix the test cases _*testRepeatedCallsWithCommitOnSuccess*_ and the new test 
_*testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast*_ to always commit 
even if _*reader.readEvents()*_ returns an empty list, and also fix the 
implementations like _*org/apache/flume/source/SpoolDirectorySource.java*_ to 
do the same as suggested.
* Explicitly set the committed flag to true when the file has been rolled 
successfully in 
_*org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java*_

Also I would like to have an optional flag in the Spooling directory to allow 
to read empty files as events, because in some use cases where file name and 
other meta data related to the file is important for the receiver end. The 
current implementation just ignores empty files.

> SpoolingDirectory is unable to handle empty files
> -------------------------------------------------
>
>                 Key: FLUME-2318
>                 URL: https://issues.apache.org/jira/browse/FLUME-2318
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: Muhammad Ehsan ul Haque
>            Priority: Minor
>              Labels: easytest, patch
>             Fix For: v1.4.0
>
>         Attachments: FLUME-2318-0.patch, FLUME-2318-1.patch
>
>
> Empty files should be returned as an empty event instead of no event.
> h4. Scenario
> From the start consume files in this order
> # f1: File with data or empty file
> # f2: Empty File
> # No file in spooling directory
> h4. Expected Outcome
> # channel.take() should return event with f1 data.
> # channel.take() should return event with f2 data (empty data).
> # channel.take() should return null.
> h4. What happens
> # channel.take() returns event with f1 data.
> # channel.take() returns null.
> # Exception is raised when the SpoolDirectorySource thread tries to read 
> events from the ReliableSpoolingFileEventReader. Snippet of trace is
> 2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
>  Preparing to move file /tmp/1391957195572-0/file1 to 
> /tmp/1391957195572-0/file1.COMPLETED
> 2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)]
>  Last read was never committed - resetting mark position.
> 2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
>  Preparing to move file /tmp/1391957195572-0/file2 to 
> /tmp/1391957195572-0/file2.COMPLETED
> 2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)]
>  FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: 
> Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure 
> Flume to continue processing.
> java.lang.IllegalStateException: File should not roll when commit is 
> outstanding.
>       at 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
>       at 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>       at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:722)
> h4. Unit Test
> In TestSpoolDirectorySource
> {code}
>   @Test
>   public void testWithEmptyFile2()
>       throws InterruptedException, IOException {
>     Context context = new Context();
>     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
>     Files.write("some data".getBytes(), f1);
>     File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
>     Files.write(new byte[0], f2);
>     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
>         tmpDir.getAbsolutePath());
>     Configurables.configure(source, context);
>     source.start();
>     Thread.sleep(10);
>     for (int i=0; i<2; i++) {
>       Transaction txn = channel.getTransaction();
>       txn.begin();
>       Event e = channel.take();
>       txn.commit();
>       txn.close();
>     }
>     Transaction txn = channel.getTransaction();
>     txn.begin();
>     Assert.assertNull(channel.take());
>     txn.commit();
>     txn.close();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to