Updated Branches: refs/heads/trunk f90f1fbaa -> 5289ccc56
FLUME-1428: File Channel should not consider a file as inactive until all takes are committed (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5289ccc5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5289ccc5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5289ccc5 Branch: refs/heads/trunk Commit: 5289ccc566a02b7ca59e0f5ae39dd0a4369f48cb Parents: f90f1fb Author: Brock Noland <[email protected]> Authored: Fri Aug 24 15:57:36 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Fri Aug 24 15:57:36 2012 -0500 ---------------------------------------------------------------------- .../apache/flume/channel/file/FlumeEventQueue.java | 13 +++++- .../apache/flume/channel/file/TestFileChannel.java | 33 +++++++++++++++ 2 files changed, 45 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5289ccc5/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 766c59a..8085d22 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -357,7 +357,10 @@ final class FlumeEventQueue { //Java implements clone pretty well. The main place this is used //in checkpointing and deleting old files, so best //to use a sorted set implementation. - return new TreeSet<Integer>(fileIDCounts.keySet()); + SortedSet<Integer> fileIDs = new TreeSet(fileIDCounts.keySet()); + fileIDs.addAll(inflightPuts.getFileIDs()); + fileIDs.addAll(inflightTakes.getFileIDs()); + return fileIDs; } protected void incrementFileID(int fileID) { @@ -559,6 +562,7 @@ final class FlumeEventQueue { private volatile Future<?> future; private final File inflightEventsFile; private volatile boolean syncRequired = false; + private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create(); public InflightEventWrapper(File inflightEventsFile) throws Exception{ if(!inflightEventsFile.exists()){ @@ -581,6 +585,7 @@ final class FlumeEventQueue { return false; } inflightEvents.removeAll(transactionID); + inflightFileIDs.removeAll(transactionID); syncRequired = true; return true; } @@ -592,6 +597,8 @@ final class FlumeEventQueue { */ public void addEvent(Long transactionID, Long pointer){ inflightEvents.put(transactionID, pointer); + inflightFileIDs.put(transactionID, + FlumeEventPointer.fromLong(pointer).getFileID()); syncRequired = true; } @@ -728,6 +735,10 @@ final class FlumeEventQueue { public boolean syncRequired(){ return syncRequired; } + + public Collection<Integer> getFileIDs(){ + return inflightFileIDs.values(); + } } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/5289ccc5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 2093839..3e01395 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -859,6 +859,39 @@ public class TestFileChannel { channel.stop(); } + @Test + public void testReferenceCounts() throws Exception { + Set<String> set = Sets.newHashSet(); + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20"); + final FileChannel channel = createFileChannel(overrides); + channel.start(); + List<String> in = putEvents(channel, "testing-reference-counting", 1, 15); + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < 10; i++) { + channel.take(); + } + + forceCheckpoint(channel); + tx.rollback(); + //Since we did not commit the original transaction. now we should get 15 + //events back. + final List<String> takenEvents = Lists.newArrayList(); + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + try { + takenEvents.addAll(takeEvents(channel, 15)); + } catch (Exception ex) { + Throwables.propagate(ex); + } + } + }).get(); + Assert.assertEquals(15, takenEvents.size()); + } + private static void forceCheckpoint(FileChannel channel) { Log log = field("log") .ofType(Log.class)
