FLUME-1417: File Channel checkpoint can be bad leading to the channel being 
unable to start

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/444b75af
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/444b75af
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/444b75af

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: 444b75af93d37e4b4a6a082e4e6e6009d4f57b48
Parents: 62ee05b
Author: Brock Noland <[email protected]>
Authored: Sun Aug 5 19:11:59 2012 -0500
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Sep 7 13:09:49 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/FlumeEventQueue.java |   13 +++--
 .../java/org/apache/flume/channel/file/Log.java    |   37 ++++++++++-----
 2 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/444b75af/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 64d3dec..e692934 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -26,11 +26,9 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel.MapMode;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
@@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * Queue of events in the channel. This queue stores only
@@ -296,12 +296,15 @@ class FlumeEventQueue {
     return false;
   }
   /**
-   * @return the set of fileIDs which are currently on the queue
+   * @return a copy of the set of fileIDs which are currently on the queue
    * will be normally be used when deciding which data files can
    * be deleted
    */
-  synchronized Set<Integer> getFileIDs() {
-    return new HashSet<Integer>(fileIDCounts.keySet());
+  synchronized SortedSet<Integer> getFileIDs() {
+    //Java implements clone pretty well. The main place this is used
+    //in checkpointing and deleting old files, so best
+    //to use a sorted set implementation.
+    return new TreeSet<Integer>(fileIDCounts.keySet());
   }
 
   protected void incrementFileID(int fileID) {

http://git-wip-us.apache.org/repos/asf/flume/blob/444b75af/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 64a70c8..778db64 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +47,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.SortedSet;
 
 /**
  * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
@@ -706,6 +706,7 @@ class Log {
   private boolean writeCheckpoint(boolean force)
       throws IOException {
     boolean lockAcquired = false;
+    boolean checkpointCompleted = false;
     try {
       lockAcquired = checkpointWriterLock.tryLock(this.checkpointWriteTimeout,
           TimeUnit.SECONDS);
@@ -716,11 +717,19 @@ class Log {
     if(!lockAcquired) {
       return false;
     }
+    SortedSet<Integer> idSet = null;
     try {
       if (queue.checkpoint(force) || force) {
         long ts = queue.getTimestamp();
 
-        Set<Integer> idSet = queue.getFileIDs();
+        //Since the active files might also be in the queue's fileIDs,
+        //we need to either move each one to a new set or remove each one
+        //as we do here. Otherwise we cannot make sure every element in
+        //fileID set from the queue have been updated.
+        //Since clone is smarter than insert, better to make
+        //a copy of the set first so that we can use it later.
+        idSet = queue.getFileIDs();
+        SortedSet<Integer> idSetToCompare = new TreeSet(idSet);
 
         int numFiles = logFiles.length();
         for (int i = 0; i < numFiles; i++) {
@@ -749,25 +758,32 @@ class Log {
           idIterator.remove();
         }
         Preconditions.checkState(idSet.size() == 0,
-            "Could not update all data file timestamps: " + idSet);
+                "Could not update all data file timestamps: " + idSet);
+        //Add files from all log directories
+        for (int index = 0; index < logDirs.length; index++) {
+          idSetToCompare.add(logFiles.get(index).getFileID());
+        }
+        idSet = idSetToCompare;
+        checkpointCompleted = true;
       }
     } finally {
       checkpointWriterLock.unlock();
     }
+    //Do the deletes outside the checkpointWriterLock
+    //Delete logic is expensive.
+    if (open && checkpointCompleted) {
+      removeOldLogs(idSet);
+    }
     //Since the exception is not caught, this will not be returned if
     //an exception is thrown from the try.
     return true;
   }
 
-  private void removeOldLogs() {
+  private void removeOldLogs(SortedSet<Integer> fileIDs) {
     Preconditions.checkState(open, "Log is closed");
     // we will find the smallest fileID currently in use and
     // won't delete any files with an id larger than the min
-    Set<Integer> fileIDs = new TreeSet<Integer>(queue.getFileIDs());
-    for (int index = 0; index < logDirs.length; index++) {
-      fileIDs.add(logFiles.get(index).getFileID());
-    }
-    int minFileID = Collections.min(fileIDs);
+    int minFileID = fileIDs.first();
     LOGGER.debug("Files currently in use: " + fileIDs);
     for(File logDir : logDirs) {
       List<File> logs = LogUtils.getLogs(logDir);
@@ -895,9 +911,6 @@ class Log {
               }
             }
           }
-          if(log.open) {
-            log.removeOldLogs();
-          }
         } catch (IOException e) {
           LOG.error("Error doing checkpoint", e);
         } catch (Exception e) {

Reply via email to