Repository: flume Updated Branches: refs/heads/trunk a582c100f -> f979b2683
FLUME-2502. Improve Spool Directory Source's performance by not listing files each time. (Prateek Rungta via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f979b268 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f979b268 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f979b268 Branch: refs/heads/trunk Commit: f979b2683fc48d85806ae7593ee0e393bd812260 Parents: a582c10 Author: Hari Shreedharan <[email protected]> Authored: Wed Oct 15 21:29:46 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Oct 15 21:29:46 2014 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 48 +++++++++----- .../TestReliableSpoolingFileEventReader.java | 66 +++++++++++++++++++- 2 files changed, 97 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f979b268/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index f858b56..1833076 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -19,6 +19,7 @@ package org.apache.flume.client.avro; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -45,6 +46,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.*; import java.util.regex.Pattern; +import java.util.ArrayList; /** * <p/>A {@link ReliableEventReader} which reads log data from files stored @@ -99,6 +101,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private Optional<FileInfo> lastFileRead = Optional.absent(); private boolean committed = true; + /** Instance var to Cache directory listing **/ + private Iterator<File> candidateFileIter = null; + private int listFilesCount = 0; + /** * Create a ReliableSpoolingFileEventReader to watch the given directory. */ @@ -195,6 +201,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.metaFile = new File(trackerDirectory, metaFileName); } + @VisibleForTesting + int getListFilesCount() { + return listFilesCount; + } + /** Return the filename which generated the data from the last successful * {@link #readEvents(int)} call. Returns null if called before any file * contents are read. */ @@ -409,29 +420,38 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { * If two or more files are equally old/young, then the file name with * lower lexicographical value is returned. * If the {@link #consumeOrder} variable is {@link ConsumeOrder#RANDOM} - * then returns any arbitrary file in the directory. + * then cache the directory listing to amortize retreival cost, and return + * any arbitary file from the directory. */ private Optional<FileInfo> getNextFile() { - /* Filter to exclude finished or hidden files */ - FileFilter filter = new FileFilter() { - public boolean accept(File candidate) { - String fileName = candidate.getName(); - if ((candidate.isDirectory()) || + List<File> candidateFiles = Collections.emptyList(); + + if (consumeOrder != ConsumeOrder.RANDOM || + candidateFileIter == null || + !candidateFileIter.hasNext()) { + /* Filter to exclude finished or hidden files */ + FileFilter filter = new FileFilter() { + public boolean accept(File candidate) { + String fileName = candidate.getName(); + if ((candidate.isDirectory()) || (fileName.endsWith(completedSuffix)) || (fileName.startsWith(".")) || ignorePattern.matcher(fileName).matches()) { - return false; + return false; + } + return true; } - return true; - } - }; - List<File> candidateFiles = Arrays.asList( - spoolDirectory.listFiles(filter)); - if (candidateFiles.isEmpty()) { // No matching file in spooling directory. + }; + candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); + listFilesCount++; + candidateFileIter = candidateFiles.iterator(); + } + + if (!candidateFileIter.hasNext()) { // No matching file in spooling directory. return Optional.absent(); } - File selectedFile = candidateFiles.get(0); // Select the first random file. + File selectedFile = candidateFileIter.next(); if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random. return openFile(selectedFile); } else if (consumeOrder == ConsumeOrder.YOUNGEST) { http://git-wip-us.apache.org/repos/asf/flume/blob/f979b268/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index 6a02612..a6b2473 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Files; import junit.framework.Assert; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SystemUtils; import org.apache.flume.Event; import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy; import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; @@ -38,6 +39,7 @@ import java.io.FileFilter; import java.io.IOException; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.*; public class TestReliableSpoolingFileEventReader { @@ -212,7 +214,7 @@ public class TestReliableSpoolingFileEventReader { FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n"); Set<String> actual = Sets.newHashSet(); - readEventsForFilesInDir(WORK_DIR, reader, actual); + readEventsForFilesInDir(WORK_DIR, reader, actual); Set<String> expected = Sets.newHashSet(); createExpectedFromFilesInSetup(expected); expected.add(""); @@ -221,6 +223,52 @@ public class TestReliableSpoolingFileEventReader { Assert.assertEquals(expected, actual); } + @Test + public void testConsumeFileRandomlyNewFile() throws Exception { + // Atomic moves are not supported in Windows. + if (SystemUtils.IS_OS_WINDOWS) { + return; + } + final ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.RANDOM) + .build(); + File fileName = new File(WORK_DIR, "new-file"); + FileUtils.write(fileName, + "New file created in the end. Shoud be read randomly.\n"); + Set<String> expected = Sets.newHashSet(); + File tempDir = Files.createTempDir(); + File tempFile = new File(tempDir, "t"); + File finalFile = new File(WORK_DIR, "t-file"); + FileUtils.write(tempFile, "Last file"); + final Set<String> actual = Sets.newHashSet(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + final Semaphore semaphore = new Semaphore(0); + Future<Void> wait = executor.submit( + new Callable<Void>() { + @Override + public Void call() throws Exception { + readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore); + return null; + } + } + ); + semaphore.acquire(); + tempFile.renameTo(finalFile); + wait.get(); + finalFile.delete(); + FileUtils.deleteQuietly(tempDir); + createExpectedFromFilesInSetup(expected); + expected.add(""); + expected.add( + "New file created in the end. Shoud be read randomly."); + expected.add("Last file"); + Assert.assertEquals(2, ((ReliableSpoolingFileEventReader)reader) + .getListFilesCount()); + Assert.assertEquals(expected, actual); + } + @Test public void testConsumeFileOldest() throws IOException, InterruptedException { @@ -414,17 +462,29 @@ public class TestReliableSpoolingFileEventReader { deleteDir(dir); } } + + private void readEventsForFilesInDir(File dir, ReliableEventReader reader, + Collection<String> actual) throws IOException { + readEventsForFilesInDir(dir, reader, actual, null); + } /* Read events, one for each file in the given directory. */ private void readEventsForFilesInDir(File dir, ReliableEventReader reader, - Collection<String> actual) throws IOException { + Collection<String> actual, Semaphore semaphore) throws IOException { List<Event> events; for (int i=0; i < listFiles(dir).size(); i++) { events = reader.readEvents(10); - for (Event e: events) { + for (Event e : events) { actual.add(new String(e.getBody())); } reader.commit(); + try { + if (semaphore != null) { + semaphore.release(); + } + } catch (Exception ex) { + throw new IOException(ex); + } } } /* Create expected results out of the files created in the setup method. */
