Updated Branches:
  refs/heads/trunk f90f1fbaa -> 5289ccc56

FLUME-1428: File Channel should not consider a file as inactive until all takes 
are committed

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5289ccc5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5289ccc5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5289ccc5

Branch: refs/heads/trunk
Commit: 5289ccc566a02b7ca59e0f5ae39dd0a4369f48cb
Parents: f90f1fb
Author: Brock Noland <[email protected]>
Authored: Fri Aug 24 15:57:36 2012 -0500
Committer: Brock Noland <[email protected]>
Committed: Fri Aug 24 15:57:36 2012 -0500

----------------------------------------------------------------------
 .../apache/flume/channel/file/FlumeEventQueue.java |   13 +++++-
 .../apache/flume/channel/file/TestFileChannel.java |   33 +++++++++++++++
 2 files changed, 45 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5289ccc5/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 766c59a..8085d22 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -357,7 +357,10 @@ final class FlumeEventQueue {
     //Java implements clone pretty well. The main place this is used
     //in checkpointing and deleting old files, so best
     //to use a sorted set implementation.
-    return new TreeSet<Integer>(fileIDCounts.keySet());
+    SortedSet<Integer> fileIDs = new TreeSet(fileIDCounts.keySet());
+    fileIDs.addAll(inflightPuts.getFileIDs());
+    fileIDs.addAll(inflightTakes.getFileIDs());
+    return fileIDs;
   }
 
   protected void incrementFileID(int fileID) {
@@ -559,6 +562,7 @@ final class FlumeEventQueue {
     private volatile Future<?> future;
     private final File inflightEventsFile;
     private volatile boolean syncRequired = false;
+    private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create();
 
     public InflightEventWrapper(File inflightEventsFile) throws Exception{
       if(!inflightEventsFile.exists()){
@@ -581,6 +585,7 @@ final class FlumeEventQueue {
         return false;
       }
       inflightEvents.removeAll(transactionID);
+      inflightFileIDs.removeAll(transactionID);
       syncRequired = true;
       return true;
     }
@@ -592,6 +597,8 @@ final class FlumeEventQueue {
      */
     public void addEvent(Long transactionID, Long pointer){
       inflightEvents.put(transactionID, pointer);
+      inflightFileIDs.put(transactionID,
+              FlumeEventPointer.fromLong(pointer).getFileID());
       syncRequired = true;
     }
 
@@ -728,6 +735,10 @@ final class FlumeEventQueue {
     public boolean syncRequired(){
       return syncRequired;
     }
+
+    public Collection<Integer> getFileIDs(){
+      return inflightFileIDs.values();
+    }
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/5289ccc5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 2093839..3e01395 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -859,6 +859,39 @@ public class TestFileChannel {
     channel.stop();
   }
 
+  @Test
+  public void testReferenceCounts() throws Exception {
+    Set<String> set = Sets.newHashSet();
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20");
+    final FileChannel channel = createFileChannel(overrides);
+    channel.start();
+    List<String> in = putEvents(channel, "testing-reference-counting", 1, 15);
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.take();
+    }
+
+    forceCheckpoint(channel);
+    tx.rollback();
+    //Since we did not commit the original transaction. now we should get 15
+    //events back.
+    final List<String> takenEvents = Lists.newArrayList();
+    Executors.newSingleThreadExecutor().submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          takenEvents.addAll(takeEvents(channel, 15));
+        } catch (Exception ex) {
+          Throwables.propagate(ex);
+        }
+      }
+    }).get();
+    Assert.assertEquals(15, takenEvents.size());
+  }
+
   private static void forceCheckpoint(FileChannel channel) {
     Log log = field("log")
         .ofType(Log.class)

Reply via email to