[ 
https://issues.apache.org/jira/browse/FLUME-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13949731#comment-13949731
 ] 

Hari Shreedharan commented on FLUME-2318:
-----------------------------------------

OK, let's do it as optional and keep the current behavior the same. We should 
parameterize the tests with one that ignores and one that sends empty files 
with the corresponding metadata. We should ensure correct behavior in both 
cases.

> SpoolingDirectory is unable to handle empty files
> -------------------------------------------------
>
>                 Key: FLUME-2318
>                 URL: https://issues.apache.org/jira/browse/FLUME-2318
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: Muhammad Ehsan ul Haque
>            Priority: Minor
>              Labels: easytest, patch
>             Fix For: v1.4.0
>
>         Attachments: FLUME-2318-0.patch, FLUME-2318-1.patch
>
>
> Empty files should be returned as an empty event instead of no event.
> h4. Scenario
> From the start consume files in this order
> # f1: File with data or empty file
> # f2: Empty File
> # No file in spooling directory
> h4. Expected Outcome
> # channel.take() should return event with f1 data.
> # channel.take() should return event with f2 data (empty data).
> # channel.take() should return null.
> h4. What happens
> # channel.take() returns event with f1 data.
> # channel.take() returns null.
> # Exception is raised when the SpoolDirectorySource thread tries to read 
> events from the ReliableSpoolingFileEventReader. Snippet of trace is
> 2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
>  Preparing to move file /tmp/1391957195572-0/file1 to 
> /tmp/1391957195572-0/file1.COMPLETED
> 2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)]
>  Last read was never committed - resetting mark position.
> 2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
>  Preparing to move file /tmp/1391957195572-0/file2 to 
> /tmp/1391957195572-0/file2.COMPLETED
> 2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)]
>  FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: 
> Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure 
> Flume to continue processing.
> java.lang.IllegalStateException: File should not roll when commit is 
> outstanding.
>       at 
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
>       at 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>       at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:722)
> h4. Unit Test
> In TestSpoolDirectorySource
> {code}
>   @Test
>   public void testWithEmptyFile2()
>       throws InterruptedException, IOException {
>     Context context = new Context();
>     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
>     Files.write("some data".getBytes(), f1);
>     File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
>     Files.write(new byte[0], f2);
>     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
>         tmpDir.getAbsolutePath());
>     Configurables.configure(source, context);
>     source.start();
>     Thread.sleep(10);
>     for (int i=0; i<2; i++) {
>       Transaction txn = channel.getTransaction();
>       txn.begin();
>       Event e = channel.take();
>       txn.commit();
>       txn.close();
>     }
>     Transaction txn = channel.getTransaction();
>     txn.begin();
>     Assert.assertNull(channel.take());
>     txn.commit();
>     txn.close();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to