Repository: flume Updated Branches: refs/heads/flume-1.6 3eae844f3 -> 6e9d1082e
FLUME-2568. Additional fix for TestReliableSpoolingFileEventReader (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6e9d1082 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6e9d1082 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6e9d1082 Branch: refs/heads/flume-1.6 Commit: 6e9d1082e44093265af9d8e16f00b725f66ec1dc Parents: 3eae844 Author: Hari Shreedharan <[email protected]> Authored: Fri Jan 16 16:56:48 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Fri Jan 16 16:57:38 2015 -0800 ---------------------------------------------------------------------- .../TestReliableSpoolingFileEventReader.java | 31 ++++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6e9d1082/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index 476bbff..4e90054 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -238,30 +238,28 @@ public class TestReliableSpoolingFileEventReader { FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n"); Set<String> expected = Sets.newHashSet(); - File tempDir = Files.createTempDir(); - File tempFile = new File(tempDir, "t"); - File finalFile = new File(WORK_DIR, "t-file"); int totalFiles = WORK_DIR.listFiles().length; - FileUtils.write(tempFile, "Last file"); final Set<String> actual = Sets.newHashSet(); ExecutorService executor = Executors.newSingleThreadExecutor(); - final Semaphore semaphore = new Semaphore(0); + final Semaphore semaphore1 = new Semaphore(0); + final Semaphore semaphore2 = new Semaphore(0); Future<Void> wait = executor.submit( new Callable<Void>() { @Override public Void call() throws Exception { - readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore); + readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, semaphore2); return null; } } ); - semaphore.acquire(); - tempFile.renameTo(finalFile); + semaphore1.acquire(); + File finalFile = new File(WORK_DIR, "t-file"); + FileUtils.write(finalFile, "Last file"); + semaphore2.release(); wait.get(); int listFilesCount = ((ReliableSpoolingFileEventReader)reader) .getListFilesCount(); finalFile.delete(); - FileUtils.deleteQuietly(tempDir); createExpectedFromFilesInSetup(expected); expected.add(""); expected.add( @@ -496,13 +494,14 @@ public class TestReliableSpoolingFileEventReader { private void readEventsForFilesInDir(File dir, ReliableEventReader reader, Collection<String> actual) throws IOException { - readEventsForFilesInDir(dir, reader, actual, null); + readEventsForFilesInDir(dir, reader, actual, null, null); } /* Read events, one for each file in the given directory. */ private void readEventsForFilesInDir(File dir, ReliableEventReader reader, - Collection<String> actual, Semaphore semaphore) throws IOException { + Collection<String> actual, Semaphore semaphore1, Semaphore semaphore2) throws IOException { List<Event> events; + boolean executed = false; for (int i=0; i < listFiles(dir).size(); i++) { events = reader.readEvents(10); for (Event e : events) { @@ -510,8 +509,14 @@ public class TestReliableSpoolingFileEventReader { } reader.commit(); try { - if (semaphore != null) { - semaphore.release(); + if(!executed) { + executed = true; + if (semaphore1 != null) { + semaphore1.release(); + } + if (semaphore2 != null) { + semaphore2.acquire(); + } } } catch (Exception ex) { throw new IOException(ex);
