Updated Branches:
  refs/heads/trunk df7a197a5 -> 6ca616800

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 170dc72..fb0e208 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -18,12 +18,13 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.fest.reflect.exception.ReflectionError;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -31,12 +32,23 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.Map;
 import java.util.Random;
-import org.apache.flume.channel.file.proto.ProtosFactory;
+import java.util.Set;
+
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
+import static org.fest.reflect.core.Reflection.*;
 
 public class TestFileChannelRestart extends TestFileChannelBase {
   protected static final Logger LOG = LoggerFactory
@@ -119,16 +131,32 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
+
+  @Test
+  public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws
+      Exception {
+    doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false);
+  }
+
   @Test
-  public void testRestartWhenMetaDataExistsButCheckpointDoesNot()
+  public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup()
       throws Exception {
+    doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true);
+  }
+
+  private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(
+      boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     Assert.assertTrue(checkpoint.delete());
@@ -139,19 +167,36 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Assert.assertTrue(checkpoint.exists());
     Assert.assertTrue(checkpointMetaData.exists());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
+
+  @Test
+  public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{
+    doTestRestartWhenCheckpointExistsButMetaDoesNot(false);
+  }
+
   @Test
-  public void testRestartWhenCheckpointExistsButMetaDoesNot()
+  public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws
+      Exception{
+    doTestRestartWhenCheckpointExistsButMetaDoesNot(true);
+  }
+
+
+  private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup)
       throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -162,19 +207,34 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Assert.assertTrue(checkpoint.exists());
     Assert.assertTrue(checkpointMetaData.exists());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testRestartWhenNoCheckpointExists() throws Exception {
+    doTestRestartWhenNoCheckpointExists(false);
+  }
+
+  @Test
+  public void testRestartWhenNoCheckpointExistsWithBackup() throws Exception {
+    doTestRestartWhenNoCheckpointExists(true);
+  }
+
+  private void doTestRestartWhenNoCheckpointExists(boolean backup) throws
+      Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -185,19 +245,33 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Assert.assertTrue(checkpoint.exists());
     Assert.assertTrue(checkpointMetaData.exists());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
-  public void testBadCheckpointVersion() throws Exception{
+  public void testBadCheckpointVersion() throws Exception {
+    doTestBadCheckpointVersion(false);
+  }
+
+  @Test
+  public void testBadCheckpointVersionWithBackup() throws Exception {
+    doTestBadCheckpointVersion(true);
+  }
+
+  private void doTestBadCheckpointVersion(boolean backup) throws Exception{
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -209,19 +283,34 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testBadCheckpointMetaVersion() throws Exception {
+    doTestBadCheckpointMetaVersion(false);
+  }
+
+  @Test
+  public void testBadCheckpointMetaVersionWithBackup() throws Exception {
+    doTestBadCheckpointMetaVersion(true);
+  }
+
+  private void doTestBadCheckpointMetaVersion(boolean backup) throws
+      Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     FileInputStream is = new 
FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -235,19 +324,35 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception {
+    doTestDifferingOrderIDCheckpointAndMetaVersion(false);
+  }
+
+  @Test
+  public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws
+      Exception {
+    doTestDifferingOrderIDCheckpointAndMetaVersion(true);
+  }
+
+  private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup)
+      throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     FileInputStream is = new 
FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -261,19 +366,33 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
-  public void testIncompleteCheckpoint() throws Exception {
+  public void testIncompleteCheckpoint() throws Exception{
+    doTestIncompleteCheckpoint(false);
+  }
+
+  @Test
+  public void testIncompleteCheckpointWithCheckpoint() throws Exception{
+    doTestIncompleteCheckpoint(true);
+  }
+
+  private void doTestIncompleteCheckpoint(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -285,18 +404,29 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testCorruptInflightPuts() throws Exception {
-    testCorruptInflights("inflightPuts");
+    doTestCorruptInflights("inflightPuts", false);
+  }
+
+  @Test
+  public void testCorruptInflightPutsWithBackup() throws Exception {
+    doTestCorruptInflights("inflightPuts", true);
   }
 
   @Test
   public void testCorruptInflightTakes() throws Exception {
-    testCorruptInflights("inflightTakes");
+    doTestCorruptInflights("inflightTakes", false);
+  }
+
+  @Test
+  public void testCorruptInflightTakesWithBackup() throws Exception {
+    doTestCorruptInflights("inflightTakes", true);
   }
 
   @Test
@@ -352,14 +482,19 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     compareInputAndOut(in, out);
   }
 
-  private void testCorruptInflights(String name) throws Exception {
+  private void doTestCorruptInflights(String name,
+    boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File inflight = new File(checkpointDir, name);
     RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
@@ -368,19 +503,33 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testTruncatedCheckpointMeta() throws Exception {
+    doTestTruncatedCheckpointMeta(false);
+  }
+
+  @Test
+  public void testTruncatedCheckpointMetaWithBackup() throws Exception {
+    doTestTruncatedCheckpointMeta(true);
+  }
+
+  private void doTestTruncatedCheckpointMeta(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(
@@ -391,19 +540,33 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
   @Test
   public void testCorruptCheckpointMeta() throws Exception {
+    doTestCorruptCheckpointMeta(false);
+  }
+
+  @Test
+  public void testCorruptCheckpointMetaWithBackup() throws Exception {
+    doTestCorruptCheckpointMeta(true);
+  }
+
+  private void doTestCorruptCheckpointMeta(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, 
String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
+    if(backup) {
+      Thread.sleep(2000);
+    }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(
@@ -415,10 +578,19 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
 
+  private void checkIfBackupUsed(boolean backup) {
+    boolean backupRestored = channel.checkpointBackupRestored();
+    if (backup) {
+      Assert.assertTrue(backupRestored);
+    } else {
+      Assert.assertFalse(backupRestored);
+    }
+  }
 
   @Test
   public void testWithExtraLogs()
@@ -445,4 +617,158 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
+
+  // Make sure the entire channel was not replayed, only the events from the
+  // backup.
+  @Test
+  public void testBackupUsedEnsureNoFullReplay() throws Exception {
+    File dataDir = Files.createTempDir();
+    File tempBackup = Files.createTempDir();
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.DATA_DIRS,
+      dataDir.getAbsolutePath());
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    Thread.sleep(2000);
+    in = putEvents(channel, "restart", 10, 100);
+    takeEvents(channel, 10, 100);
+    Assert.assertEquals(100, in.size());
+    for(File file : backupDir.listFiles()) {
+      if(file.getName().equals(Log.FILE_LOCK)) {
+        continue;
+      }
+      Files.copy(file, new File(tempBackup, file.getName()));
+    }
+    forceCheckpoint(channel);
+    channel.stop();
+
+    Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES);
+    // The last checkpoint may have been already backed up (it did while I
+    // was running this test, since the checkpoint itself is tiny in unit
+    // tests), so throw away the backup and force the use of an older backup by
+    // bringing in the copy of the last backup before the checkpoint.
+    Serialization.deleteAllFiles(backupDir, Log.EXCLUDES);
+    for(File file : tempBackup.listFiles()) {
+      if(file.getName().equals(Log.FILE_LOCK)) {
+        continue;
+      }
+      Files.copy(file, new File(backupDir, file.getName()));
+    }
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    checkIfBackupUsed(true);
+    Assert.assertEquals(100, channel.getLog().getPutCount());
+    Assert.assertEquals(20, channel.getLog().getCommittedCount());
+    Assert.assertEquals(100, channel.getLog().getTakeCount());
+    Assert.assertEquals(0, channel.getLog().getRollbackCount());
+    //Read Count = 100 puts + 10 commits + 100 takes + 10 commits
+    Assert.assertEquals(220, channel.getLog().getReadCount());
+    consumeChannel(channel);
+    FileUtils.deleteQuietly(dataDir);
+    FileUtils.deleteQuietly(tempBackup);
+  }
+
+  //Make sure data files required by the backup checkpoint are not deleted.
+  @Test
+  public void testDataFilesRequiredByBackupNotDeleted() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+    channel = createFileChannel(overrides);
+    channel.start();
+    String prefix = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
+    Assert.assertTrue(channel.isOpen());
+    putEvents(channel, prefix, 10, 100);
+    Set<String> origFiles = Sets.newHashSet();
+    for(File dir : dataDirs) {
+      origFiles.addAll(Lists.newArrayList(dir.list()));
+    }
+    forceCheckpoint(channel);
+    takeEvents(channel, 10, 50);
+    long beforeSecondCheckpoint = System.currentTimeMillis();
+    forceCheckpoint(channel);
+    Set<String> newFiles = Sets.newHashSet();
+    int olderThanCheckpoint = 0;
+    int totalMetaFiles = 0;
+    for(File dir : dataDirs) {
+      File[] metadataFiles = dir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name.endsWith(".meta")) {
+            return true;
+          }
+          return false;
+        }
+      });
+      totalMetaFiles = metadataFiles.length;
+      for(File metadataFile : metadataFiles) {
+        if(metadataFile.lastModified() < beforeSecondCheckpoint) {
+          olderThanCheckpoint++;
+        }
+      }
+      newFiles.addAll(Lists.newArrayList(dir.list()));
+    }
+    /*
+     * Files which are not required by the new checkpoint should not have been
+     * modified by the checkpoint.
+     */
+    Assert.assertTrue(olderThanCheckpoint > 0);
+    Assert.assertTrue(totalMetaFiles != olderThanCheckpoint);
+
+    /*
+     * All files needed by original checkpoint should still be there.
+     */
+    Assert.assertTrue(newFiles.containsAll(origFiles));
+    takeEvents(channel, 10, 50);
+    forceCheckpoint(channel);
+    newFiles = Sets.newHashSet();
+    for(File dir : dataDirs) {
+      newFiles.addAll(Lists.newArrayList(dir.list()));
+    }
+    Assert.assertTrue(!newFiles.containsAll(origFiles));
+  }
+
+  @Test (expected = IOException.class)
+  public void testSlowBackup() throws Throwable {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    slowdownBackup(channel);
+    forceCheckpoint(channel);
+    in = putEvents(channel, "restart", 10, 100);
+    takeEvents(channel, 10, 100);
+    Assert.assertEquals(100, in.size());
+    try {
+      forceCheckpoint(channel);
+    } catch (ReflectionError ex) {
+      throw ex.getCause();
+    } finally {
+      channel.stop();
+    }
+  }
+
+  private static void slowdownBackup(FileChannel channel) {
+    Log log = field("log").ofType(Log.class).in(channel).get();
+
+    FlumeEventQueue queue = field("queue")
+      .ofType(FlumeEventQueue.class)
+      .in(log).get();
+
+    EventQueueBackingStore backingStore = field("backingStore")
+      .ofType(EventQueueBackingStore.class)
+      .in(queue).get();
+
+    field("slowdownBackup").ofType(Boolean.class).in(backingStore).set(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index ba653e6..7c490b5 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -237,7 +237,7 @@ public class TestUtils {
       } catch (Exception ex) {
         transaction.rollback();
         if(untilCapacityIsReached && ex instanceof ChannelException &&
-            ("The channel has reached it's capacity. " 
+            ("The channel has reached it's capacity. "
                 + "This might be the result of a sink on the channel having 
too "
                 + "low of batch size, a downstream system running slower than "
                 + "normal, or that the channel capacity is just too low. "
@@ -260,10 +260,13 @@ public class TestUtils {
   }
 
   public static Context createFileChannelContext(String checkpointDir,
-      String dataDir, Map<String, String> overrides) {
+      String dataDir, String backupDir, Map<String, String> overrides) {
     Context context = new Context();
     context.put(FileChannelConfiguration.CHECKPOINT_DIR,
             checkpointDir);
+    if(backupDir != null) {
+      context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir);
+    }
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
     context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1));
     context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
@@ -273,10 +276,16 @@ public class TestUtils {
     return context;
   }
   public static FileChannel createFileChannel(String checkpointDir,
-      String dataDir, Map<String, String> overrides) {
+    String dataDir, Map<String, String> overrides) {
+    return createFileChannel(checkpointDir, dataDir, null, overrides);
+  }
+
+  public static FileChannel createFileChannel(String checkpointDir,
+      String dataDir, String backupDir, Map<String, String> overrides) {
     FileChannel channel = new FileChannel();
     channel.setName("FileChannel-" + UUID.randomUUID());
-    Context context = createFileChannelContext(checkpointDir, dataDir, 
overrides);
+    Context context = createFileChannelContext(checkpointDir, dataDir,
+      backupDir, overrides);
     Configurables.configure(channel, context);
     return channel;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 16fba45..693c0d7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1930,6 +1930,8 @@ Property Name         Default                           
Description
 ================================================  
================================  
========================================================
 **type**                                          --                           
     The component type name, needs to be ``file``.
 checkpointDir                                     
~/.flume/file-channel/checkpoint  The directory where checkpoint file will be 
stored
+useDualCheckpoints                                false                        
     Backup the checkpoint. If this is set to ``true``, ``backupCheckpointDir`` 
**must** be set
+backupCheckpointDir                               --                           
     The directory where the checkpoint is backed up to. This directory **must 
not** be the same as the data directories or the checkpoint directory
 dataDirs                                          ~/.flume/file-channel/data   
     The directory where log files will be stored
 transactionCapacity                               1000                         
     The maximum size of transaction supported by the channel
 checkpointInterval                                30000                        
     Amount of time (in millis) between checkpoints

Reply via email to