Updated Branches:
  refs/heads/flume-1.4 48504e8f3 -> 5e53a056b

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto 
b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
index 1e668d2..25520e8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -56,6 +56,7 @@ message TransactionEventHeader {
 
 message Put {
   required FlumeEvent event = 1;
+  optional sfixed64 checksum = 2;
 }
 
 message Take {

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 0f7d14d..25765b5 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -21,13 +21,17 @@ package org.apache.flume.channel.file;
 import static org.apache.flume.channel.file.TestUtils.*;
 import static org.fest.reflect.core.Reflection.*;
 
+import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -487,7 +491,7 @@ public class TestFileChannel extends TestFileChannelBase {
   public void testReferenceCounts() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
-    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "100");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "150");
     final FileChannel channel = createFileChannel(overrides);
     channel.start();
     putEvents(channel, "testing-reference-counting", 1, 15);
@@ -568,4 +572,42 @@ public class TestFileChannel extends TestFileChannelBase {
 
   }
 
+  @Test (expected = IllegalStateException.class)
+  public void testChannelDiesOnCorruptEvent() throws Exception {
+    final FileChannel channel = createFileChannel();
+    channel.start();
+    putEvents(channel,"test-corrupt-event",100,100);
+    for(File dataDir : dataDirs) {
+      File[] files = dataDir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if(!name.endsWith("meta") && !name.contains("lock")){
+            return true;
+          }
+          return false;
+        }
+      });
+      if (files != null && files.length > 0) {
+        for (int j = 0; j < files.length; j++) {
+          RandomAccessFile fileToCorrupt = new RandomAccessFile(files[0], 
"rw");
+          fileToCorrupt.seek(50);
+          fileToCorrupt.writeByte(234);
+          fileToCorrupt.close();
+        }
+      }
+    }
+    try {
+      consumeChannel(channel, true);
+    } catch (IllegalStateException ex) {
+      // The rollback call in takeEvents() in TestUtils will cause an
+      // IllegalArgumentException - and this should be tested to verify the
+      // channel is completely stopped.
+      Assert.assertTrue(ex.getMessage().contains("Log is closed"));
+      throw ex;
+    }
+    Assert.fail();
+
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index 54978f8..8a5f8ad 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -74,7 +74,8 @@ public class TestLog {
    * not transactional so the commit is not required.
    */
   @Test
-  public void testPutGet() throws IOException, InterruptedException {
+  public void testPutGet()
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -86,7 +87,8 @@ public class TestLog {
     Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
   }
   @Test
-  public void testRoll() throws IOException, InterruptedException {
+  public void testRoll()
+    throws IOException, InterruptedException, NoopRecordException {
     log.shutdownWorker();
     Thread.sleep(1000);
     for (int i = 0; i < 1000; i++) {
@@ -107,15 +109,16 @@ public class TestLog {
         }
       }
     }
-    // 78 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000
-    Assert.assertEquals(156, logCount);
+    // 93 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000
+    Assert.assertEquals(186, logCount);
   }
   /**
    * After replay of the log, we should find the event because the put
    * was committed
    */
   @Test
-  public void testPutCommit() throws IOException, InterruptedException {
+  public void testPutCommit()
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn);
@@ -243,16 +246,16 @@ public class TestLog {
    */
   @Test
   public void testPutTakeRollbackLogReplayV1()
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException, NoopRecordException {
     doPutTakeRollback(true);
   }
   @Test
   public void testPutTakeRollbackLogReplayV2()
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException, NoopRecordException {
     doPutTakeRollback(false);
   }
   public void doPutTakeRollback(boolean useLogReplayV1)
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long putTransactionID = ++transactionID;
     FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn);
@@ -392,7 +395,7 @@ public class TestLog {
   }
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay()
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -406,7 +409,7 @@ public class TestLog {
   }
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay()
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -422,7 +425,7 @@ public class TestLog {
   }
   public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent 
eventIn,
       FlumeEventPointer eventPointer) throws IOException,
-      InterruptedException {
+    InterruptedException, NoopRecordException {
     for (int i = 0; i < dataDirs.length; i++) {
       for(File logFile : LogUtils.getLogs(dataDirs[i])) {
         if(logFile.length() == 0L) {
@@ -461,7 +464,8 @@ public class TestLog {
   }
 
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
-      FlumeEvent eventIn) throws IOException, InterruptedException {
+      FlumeEvent eventIn)
+    throws IOException, InterruptedException, NoopRecordException {
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNotNull(eventPointerOut);

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index bef22ef..4da6ac1 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -21,12 +21,16 @@ package org.apache.flume.channel.file;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -98,6 +102,8 @@ public class TestLogFile {
     final List<Throwable> errors =
         Collections.synchronizedList(new ArrayList<Throwable>());
     ExecutorService executorService = Executors.newFixedThreadPool(10);
+    CompletionService<Void> completionService = new ExecutorCompletionService
+      <Void>(executorService);
     final LogFile.RandomReader logFileReader =
         LogFileFactory.getRandomReader(dataFile, null);
     for (int i = 0; i < 1000; i++) {
@@ -117,7 +123,7 @@ public class TestLogFile {
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       final int offset = ptr.getOffset();
-      executorService.submit(new Runnable() {
+      completionService.submit(new Runnable() {
         @Override
         public void run() {
           try {
@@ -130,7 +136,11 @@ public class TestLogFile {
             }
           }
         }
-      });
+      }, null);
+    }
+
+    for(int i = 0; i < 1000; i++) {
+      completionService.take();
     }
     // first try and throw failures
     for(Throwable throwable : errors) {
@@ -142,7 +152,8 @@ public class TestLogFile {
     }
   }
   @Test
-  public void testReader() throws InterruptedException, IOException {
+  public void testReader() throws InterruptedException, IOException,
+    CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
@@ -169,7 +180,8 @@ public class TestLogFile {
   }
 
   @Test
-  public void testReaderOldMetaFile() throws InterruptedException, IOException 
{
+  public void testReaderOldMetaFile() throws InterruptedException,
+    IOException, CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
@@ -204,7 +216,8 @@ public class TestLogFile {
   }
 
     @Test
-  public void testReaderTempMetaFile() throws InterruptedException, 
IOException {
+  public void testReaderTempMetaFile() throws InterruptedException,
+      IOException, CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
@@ -260,4 +273,92 @@ public class TestLogFile {
     Assert.assertEquals(3, metaData.getCheckpointPosition());
     Assert.assertEquals(4, metaData.getCheckpointWriteOrderID());
   }
+
+  @Test (expected = CorruptEventException.class)
+  public void testPutGetCorruptEvent() throws Exception {
+    final LogFile.RandomReader logFileReader =
+      LogFileFactory.getRandomReader(dataFile, null);
+    final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
+      eventIn);
+    ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+    FlumeEventPointer ptr = logFileWriter.put(bytes);
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
+      (transactionID, WriteOrderOracle.next())));
+    final int offset = ptr.getOffset();
+    RandomAccessFile writer = new RandomAccessFile(dataFile, "rw");
+    writer.seek(offset + 1500);
+    writer.write((byte) 45);
+    writer.write((byte) 12);
+    writer.getFD().sync();
+    logFileReader.get(offset);
+
+    // Should have thrown an exception by now.
+    Assert.fail();
+
+  }
+
+  @Test (expected = NoopRecordException.class)
+  public void testPutGetNoopEvent() throws Exception {
+    final LogFile.RandomReader logFileReader =
+      LogFileFactory.getRandomReader(dataFile, null);
+    final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
+      eventIn);
+    ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+    FlumeEventPointer ptr = logFileWriter.put(bytes);
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
+      (transactionID, WriteOrderOracle.next())));
+    final int offset = ptr.getOffset();
+    LogFile.OperationRecordUpdater updater = new LogFile
+      .OperationRecordUpdater(dataFile);
+    updater.markRecordAsNoop(offset);
+    logFileReader.get(offset);
+
+    // Should have thrown an exception by now.
+    Assert.fail();
+  }
+
+  @Test
+  public void testOperationRecordUpdater() throws Exception {
+    File tempDir = Files.createTempDir();
+    File temp = new File(tempDir, "temp");
+    final RandomAccessFile tempFile = new RandomAccessFile(temp, "rw");
+    for(int i = 0; i < 5000; i++) {
+      tempFile.write(LogFile.OP_RECORD);
+    }
+    tempFile.seek(0);
+    LogFile.OperationRecordUpdater recordUpdater = new LogFile
+      .OperationRecordUpdater(temp);
+    //Convert every 10th byte into a noop byte
+    for(int i = 0; i < 5000; i+=10) {
+      recordUpdater.markRecordAsNoop(i);
+    }
+    recordUpdater.close();
+
+    tempFile.seek(0);
+    // Verify every 10th byte is actually a NOOP
+    for(int i = 0; i < 5000; i+=10) {
+      tempFile.seek(i);
+      Assert.assertEquals(LogFile.OP_NOOP, tempFile.readByte());
+    }
+
+  }
+
+  @Test
+  public void testOpRecordUpdaterWithFlumeEvents() throws Exception{
+    final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
+      eventIn);
+    ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+    FlumeEventPointer ptr = logFileWriter.put(bytes);
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
+      (transactionID, WriteOrderOracle.next())));
+    final int offset = ptr.getOffset();
+    LogFile.OperationRecordUpdater updater = new LogFile
+      .OperationRecordUpdater(dataFile);
+    updater.markRecordAsNoop(offset);
+    RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw");
+    Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte());
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
index f403422..eb0ce04 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -53,7 +53,7 @@ public class TestTransactionEventRecordV3 {
         commit.getRecordType());
   }
   @Test
-  public void testPutSerialization() throws IOException {
+  public void testPutSerialization() throws IOException, CorruptEventException 
{
     Map<String, String> headers = new HashMap<String, String>();
     headers.put("key", "value");
     Put in = new Put(System.currentTimeMillis(),
@@ -70,7 +70,8 @@ public class TestTransactionEventRecordV3 {
     Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), 
out.getEvent().getBody()));
   }
   @Test
-  public void testPutSerializationNullHeader() throws IOException {
+  public void testPutSerializationNullHeader() throws IOException,
+    CorruptEventException {
     Put in = new Put(System.currentTimeMillis(),
         WriteOrderOracle.next(),
         new FlumeEvent(null, new byte[0]));
@@ -84,7 +85,8 @@ public class TestTransactionEventRecordV3 {
     Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), 
out.getEvent().getBody()));
   }
   @Test
-  public void testTakeSerialization() throws IOException {
+  public void testTakeSerialization() throws IOException,
+    CorruptEventException {
     Take in = new Take(System.currentTimeMillis(),
         WriteOrderOracle.next(), 10, 20);
     Take out = (Take)TransactionEventRecord.fromByteArray(toByteArray(in));
@@ -97,7 +99,8 @@ public class TestTransactionEventRecordV3 {
   }
 
   @Test
-  public void testRollbackSerialization() throws IOException {
+  public void testRollbackSerialization() throws IOException,
+    CorruptEventException {
     Rollback in = new Rollback(System.currentTimeMillis(),
         WriteOrderOracle.next());
     Rollback out = 
(Rollback)TransactionEventRecord.fromByteArray(toByteArray(in));
@@ -108,7 +111,8 @@ public class TestTransactionEventRecordV3 {
   }
 
   @Test
-  public void testCommitSerialization() throws IOException {
+  public void testCommitSerialization() throws IOException,
+    CorruptEventException {
     Commit in = new Commit(System.currentTimeMillis(),
         WriteOrderOracle.next());
     Commit out = (Commit)TransactionEventRecord.fromByteArray(toByteArray(in));
@@ -119,7 +123,7 @@ public class TestTransactionEventRecordV3 {
   }
 
   @Test
-  public void testBadType() throws IOException {
+  public void testBadType() throws IOException, CorruptEventException {
     TransactionEventRecord in = mock(TransactionEventRecord.class);
     when(in.getRecordType()).thenReturn(Short.MIN_VALUE);
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 563dbcc..a5ab45a 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -64,6 +65,16 @@ public class TestUtils {
     return event;
   }
 
+  public static FlumeEvent newPersistableEvent(int size) {
+    Map<String, String> headers = Maps.newHashMap();
+    String timestamp = String.valueOf(System.currentTimeMillis());
+    headers.put("timestamp", timestamp);
+    byte[] data = new byte[size];
+    Arrays.fill(data, (byte) 54);
+    FlumeEvent event = new FlumeEvent(headers, data);
+    return event;
+  }
+
   public static DataInput toDataInput(Writable writable) throws IOException {
     ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
     DataOutputStream dataOutput = new DataOutputStream(byteOutput);
@@ -129,25 +140,47 @@ public class TestUtils {
             .invoke(true));
   }
 
+  public static Set<String> takeEvents(Channel channel, int batchSize)
+    throws Exception {
+    return takeEvents(channel, batchSize, false);
+  }
+
+  public static Set<String> takeEvents(Channel channel,
+          int batchSize, boolean checkForCorruption) throws Exception {
+    return takeEvents(channel, batchSize, Integer.MAX_VALUE, 
checkForCorruption);
+  }
+
   public static Set<String> takeEvents(Channel channel,
-          int batchSize) throws Exception {
-    return takeEvents(channel, batchSize, Integer.MAX_VALUE);
+    int batchSize, int numEvents) throws Exception {
+    return takeEvents(channel, batchSize, numEvents, false);
   }
 
   public static Set<String> takeEvents(Channel channel,
-          int batchSize, int numEvents) throws Exception {
+          int batchSize, int numEvents, boolean checkForCorruption) throws
+    Exception {
     Set<String> result = Sets.newHashSet();
     for (int i = 0; i < numEvents; i += batchSize) {
       Transaction transaction = channel.getTransaction();
       try {
         transaction.begin();
         for (int j = 0; j < batchSize; j++) {
-          Event event = null;
+          Event event;
           try {
             event = channel.take();
           } catch (ChannelException ex) {
-            Assert.assertTrue(ex.getMessage().startsWith(
-                "Take list for FileBackedTransaction, capacity"));
+            Throwable th = ex;
+            String msg;
+            if(checkForCorruption) {
+              msg = "Corrupt event found. Please run File Channel";
+              th = ex.getCause();
+            } else {
+              msg = "Take list for FileBackedTransaction, capacity";
+            }
+            Assert.assertTrue(th.getMessage().startsWith(
+                msg));
+            if(checkForCorruption) {
+              throw (Exception) th;
+            }
             transaction.commit();
             return result;
           }
@@ -168,15 +201,19 @@ public class TestUtils {
     }
     return result;
   }
-  public static Set<String> consumeChannel(Channel channel)
-      throws Exception {
+
+  public static Set<String> consumeChannel(Channel channel) throws Exception {
+    return consumeChannel(channel, false);
+  }
+  public static Set<String> consumeChannel(Channel channel,
+    boolean checkForCorruption) throws Exception {
     Set<String> result = Sets.newHashSet();
     int[] batchSizes = new int[] {
         1000, 100, 10, 1
     };
     for (int i = 0; i < batchSizes.length; i++) {
       while(true) {
-        Set<String> batch = takeEvents(channel, batchSizes[i]);
+        Set<String> batch = takeEvents(channel, batchSizes[i], 
checkForCorruption);
         if(batch.isEmpty()) {
           break;
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index e266272..9a5f64e 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -137,6 +137,10 @@
       <artifactId>flume-ng-log4jappender</artifactId>
       <classifier>jar-with-dependencies</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-tools</artifactId>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/bin.xml 
b/flume-ng-dist/src/main/assembly/bin.xml
index 74f0608..925a54a 100644
--- a/flume-ng-dist/src/main/assembly/bin.xml
+++ b/flume-ng-dist/src/main/assembly/bin.xml
@@ -95,6 +95,7 @@
         <exclude>flume-ng-legacy-sources/**</exclude>
         <exclude>flume-ng-clients/**</exclude>
         <exclude>flume-ng-embedded-agent/**</exclude>
+        <exclude>flume-tools/**</exclude>
         <exclude>**/target/**</exclude>
         <exclude>**/.classpath</exclude>
         <exclude>**/.project</exclude>

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/src.xml 
b/flume-ng-dist/src/main/assembly/src.xml
index 7fafab8..5b86994 100644
--- a/flume-ng-dist/src/main/assembly/src.xml
+++ b/flume-ng-dist/src/main/assembly/src.xml
@@ -47,6 +47,7 @@
         <include>org.apache.flume:flume-ng-legacy-sources</include>
         <include>org.apache.flume:flume-ng-clients</include>
         <include>org.apache.flume:flume-ng-embedded-agent</include>
+        <include>org.apache.flume:flume-tools</include>
       </includes>
 
       <sources>
@@ -89,6 +90,7 @@
         <exclude>flume-ng-legacy-sources/**</exclude>
         <exclude>flume-ng-clients/**</exclude>
         <exclude>flume-ng-embedded-agent/**</exclude>
+        <exclude>flume-tools/**</exclude>
         <exclude>**/target/**</exclude>
         <exclude>**/.classpath</exclude>
         <exclude>**/.project</exclude>

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/pom.xml
----------------------------------------------------------------------
diff --git a/flume-tools/pom.xml b/flume-tools/pom.xml
new file mode 100644
index 0000000..386c766
--- /dev/null
+++ b/flume-tools/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.4.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-tools</artifactId>
+
+  <name>Flume NG Tools</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-channels</groupId>
+      <artifactId>flume-file-channel</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.easytesting</groupId>
+      <artifactId>fest-reflect</artifactId>
+      <version>1.4</version>
+    </dependency>
+
+  </dependencies>
+  <profiles>
+
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>1.0.1</hadoop.version>
+        <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id>
+      </properties>
+      <dependencies>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-test</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+        <!-- required because the hadoop-core pom is missing these deps
+            and MiniDFSCluster pulls in the webhdfs classes -->
+        <dependency>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </profile>
+
+    <profile>
+      <id>hadoop-2</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>2.0.0-alpha</hadoop.version>
+        <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+      </properties>
+      <dependencies>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-minicluster</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </profile>
+
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
 
b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
new file mode 100644
index 0000000..aa24fa5
--- /dev/null
+++ 
b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.channel.file.CorruptEventException;
+import org.apache.flume.channel.file.Log;
+import org.apache.flume.channel.file.LogFile;
+import org.apache.flume.channel.file.LogFileV3;
+import org.apache.flume.channel.file.LogRecord;
+import org.apache.flume.channel.file.Serialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileChannelIntegrityTool implements FlumeTool {
+  public static final Logger LOG = LoggerFactory.getLogger
+    (FileChannelIntegrityTool.class);
+
+  private final List<File> dataDirs = new ArrayList<File>();
+
+  @Override
+  public void run(String[] args) throws IOException, ParseException {
+    boolean shouldContinue = parseCommandLineOpts(args);
+    if(!shouldContinue) {
+      LOG.error("Could not parse command line options. Exiting ...");
+      System.exit(1);
+    }
+    for(File dataDir : dataDirs) {
+      File[] dataFiles = dataDir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if(!name.endsWith(Serialization.METADATA_FILENAME)
+            && !name.endsWith(Serialization.METADATA_TMP_FILENAME)
+            && !name.endsWith(Serialization.OLD_METADATA_FILENAME)
+            && !name.equals(Log.FILE_LOCK)) {
+            return true;
+          }
+          return false;
+        }
+      });
+      if (dataFiles != null && dataFiles.length > 0) {
+        for (File dataFile : dataFiles) {
+          LOG.info("Checking for corruption in " + dataFile.toString());
+          LogFile.SequentialReader reader =
+            new LogFileV3.SequentialReader(dataFile, null);
+          LogFile.OperationRecordUpdater updater = new LogFile
+            .OperationRecordUpdater(dataFile);
+          boolean fileDone = false;
+          boolean fileBackedup = false;
+          while (!fileDone) {
+            long eventPosition = 0;
+            try {
+              // This depends on the fact that events are of the form:
+              // Type, length, data.
+              eventPosition = reader.getPosition();
+              // Try to get the record, if the checksums don't match,
+              // this will throw a CorruptEventException - so the real logic
+              // is in the catch block below.
+              LogRecord record = reader.next();
+              if (record != null) {
+                record.getEvent();
+              } else {
+                fileDone = true;
+              }
+            } catch (CorruptEventException e) {
+              LOG.warn("Corruption found in " + dataFile.toString() + " at "
+                + eventPosition);
+              if (!fileBackedup) {
+                Serialization.copyFile(dataFile, new File(dataFile.getParent(),
+                  dataFile.getName() + ".bak"));
+                fileBackedup = true;
+              }
+              updater.markRecordAsNoop(eventPosition);
+            }
+          }
+          updater.close();
+          reader.close();
+        }
+      }
+    }
+  }
+
+  private boolean parseCommandLineOpts(String[] args) throws ParseException {
+    Options options = new Options();
+    options
+      .addOption("l", "dataDirs", true, "Comma-separated list of data " +
+        "directories which the tool must verify. This option is mandatory")
+      .addOption("h", "help", false, "Display help");
+
+    CommandLineParser parser = new GnuParser();
+    CommandLine commandLine = parser.parse(options, args);
+    if(commandLine.hasOption("help")) {
+      new HelpFormatter().printHelp("java -jar fcintegritytool ",
+        options, true);
+      return false;
+    }
+    if(!commandLine.hasOption("dataDirs")) {
+      new HelpFormatter().printHelp("java -jar fcintegritytool ", "",
+        options, "dataDirs is required.", true);
+      return false;
+    } else {
+      String dataDirStr[] = commandLine.getOptionValue("dataDirs").split(",");
+      for(String dataDir : dataDirStr) {
+        File f = new File(dataDir);
+        if(!f.exists()) {
+          throw new FlumeException("Data directory, " + dataDir + " does not " 
+
+            "exist.");
+        }
+        dataDirs.add(f);
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java 
b/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java
new file mode 100644
index 0000000..28da8fe
--- /dev/null
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.tools;
+
+public interface FlumeTool {
+
+  public void run(String[] args) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java 
b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
new file mode 100644
index 0000000..f886c89
--- /dev/null
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.tools;
+
+public enum FlumeToolType {
+  FCINTEGRITYTOOL(FileChannelIntegrityTool.class);
+
+  private final Class<? extends FlumeTool> klass;
+  private FlumeToolType(Class<? extends FlumeTool> klass) {
+    this.klass = klass;
+  }
+
+  public Class<? extends FlumeTool> getClassInstance() {
+    return this.klass;
+  }
+
+  public static String getNames() {
+    StringBuilder builder = new StringBuilder();
+    for(FlumeToolType type: values()) {
+      builder.append(type.name().toLowerCase() + "\n");
+    }
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java 
b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
new file mode 100644
index 0000000..799ce85
--- /dev/null
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.tools;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+
+public class FlumeToolsMain implements FlumeTool {
+
+  // Does the actual work in the run method so we can test it without actually
+  // having to start another process.
+
+  public static void main(String[] args) throws Exception {
+    new FlumeToolsMain().run(args);
+  }
+
+  private FlumeToolsMain() {
+    //No op.
+  }
+
+  @Override
+  public void run(String[] args) throws Exception{
+    String error = "Expected name of tool and arguments for" +
+      " tool to be passed in on the command line. Please pass one of the " +
+      "following as arguments to this command: \n";
+    StringBuilder builder = new StringBuilder(error);
+    for(FlumeToolType type : FlumeToolType.values()) {
+      builder.append(type.name()).append("\n");
+    }
+    if(args == null || args.length == 0) {
+      System.out.println(builder.toString());
+      System.exit(1);
+    }
+    String toolName = args[0];
+    FlumeTool tool = null;
+    for(FlumeToolType type : FlumeToolType.values()) {
+      if(toolName.equalsIgnoreCase(type.name())) {
+        tool = type.getClassInstance().newInstance();
+        break;
+      }
+    }
+    Preconditions.checkNotNull(tool, "Cannot find tool matching " + toolName
+      + ". Please select one of: \n " + FlumeToolType.getNames());
+    if (args.length == 1) {
+      tool.run(new String[0]);
+    } else {
+      tool.run(Arrays.asList(args).subList(1, args.length).
+        toArray(new String[0]));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
 
b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
new file mode 100644
index 0000000..d328671
--- /dev/null
+++ 
b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.tools;
+
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.file.FileChannel;
+import org.apache.flume.channel.file.FileChannelConfiguration;
+import org.apache.flume.channel.file.Log;
+import org.apache.flume.channel.file.LogFile;
+import org.apache.flume.channel.file.LogFileV3;
+import org.apache.flume.channel.file.LogRecord;
+import org.apache.flume.channel.file.Serialization;
+import org.apache.flume.channel.file.WriteOrderOracle;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.fest.reflect.core.Reflection.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.RandomAccessFile;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+
+public class TestFileChannelIntegrityTool {
+  private static File baseDir;
+  private static File origCheckpointDir;
+  private static File origDataDir;
+  private static Event event;
+  private static Context ctx;
+
+  private File checkpointDir;
+  private File dataDir;
+
+
+  @BeforeClass
+  public static void setUpClass() throws Exception{
+    createDataFiles();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    checkpointDir = new File(baseDir, "checkpoint");
+    dataDir = new File(baseDir, "dataDir");
+    Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+    Assert.assertTrue(dataDir.mkdirs() || dataDir.isDirectory());
+    File[] dataFiles = origDataDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        if(name.contains("lock")) {
+          return false;
+        }
+        return true;
+      }
+    });
+    for(File dataFile : dataFiles) {
+      Serialization.copyFile(dataFile, new File(dataDir, dataFile.getName()));
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(checkpointDir);
+    FileUtils.deleteDirectory(dataDir);
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    FileUtils.deleteDirectory(origCheckpointDir);
+    FileUtils.deleteDirectory(origDataDir);
+  }
+
+  @Test
+  public void testFixCorruptRecords() throws Exception {
+    doTestFixCorruptEvents(false);
+  }
+  @Test
+  public void testFixCorruptRecordsWithCheckpoint() throws Exception {
+    doTestFixCorruptEvents(true);
+  }
+
+  public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception {
+    Set<String> corruptFiles = new HashSet<String>();
+    File[] files = dataDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        if(name.contains("lock") || name.contains("meta")) {
+          return false;
+        }
+        return true;
+      }
+    });
+    Random random = new Random();
+    int corrupted = 0;
+    for (File dataFile : files) {
+      LogFile.SequentialReader reader =
+        new LogFileV3.SequentialReader(dataFile, null);
+      RandomAccessFile handle = new RandomAccessFile(dataFile, "rw");
+      long eventPosition1 = reader.getPosition();
+      LogRecord rec = reader.next();
+      //No point corrupting commits, so ignore them
+      if(rec == null ||
+        rec.getEvent().getClass().getName().
+          equals("org.apache.flume.channel.file.Commit")) {
+        handle.close();
+        reader.close();
+        continue;
+      }
+      long eventPosition2 = reader.getPosition();
+      rec = reader.next();
+      handle.seek(eventPosition1 + 100);
+      handle.writeInt(random.nextInt());
+      corrupted++;
+      corruptFiles.add(dataFile.getName());
+      if (rec == null ||
+        rec.getEvent().getClass().getName().
+          equals("org.apache.flume.channel.file.Commit")) {
+        handle.close();
+        reader.close();
+        continue;
+      }
+      handle.seek(eventPosition2 + 100);
+      handle.writeInt(random.nextInt());
+      corrupted++;
+      handle.close();
+      reader.close();
+
+    }
+    FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
+    tool.run(new String[] {"-l", dataDir.toString()});
+    FileChannel channel = new FileChannel();
+    channel.setName("channel");
+    String cp;
+    if(withCheckpoint) {
+      cp = origCheckpointDir.toString();
+    } else {
+      FileUtils.deleteDirectory(checkpointDir);
+      Assert.assertTrue(checkpointDir.mkdirs());
+      cp = checkpointDir.toString();
+    }
+    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+    ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
+    channel.configure(ctx);
+    channel.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    int i = 0;
+    while(channel.take() != null) {
+      i++;
+    }
+    tx.commit();
+    tx.close();
+    channel.stop();
+    Assert.assertEquals(25 - corrupted, i);
+    files = dataDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        if(name.contains(".bak")) {
+          return true;
+        }
+        return false;
+      }
+    });
+    Assert.assertEquals(corruptFiles.size(), files.length);
+    for(File file : files) {
+      String name = file.getName();
+      name = name.replaceAll(".bak", "");
+      Assert.assertTrue(corruptFiles.remove(name));
+    }
+    Assert.assertTrue(corruptFiles.isEmpty());
+  }
+
+  private static void createDataFiles() throws Exception {
+    final byte[] eventData = new byte[2000];
+    for(int i = 0; i < 2000; i++) {
+      eventData[i] = 1;
+    }
+    WriteOrderOracle.setSeed(System.currentTimeMillis());
+    event = EventBuilder.withBody(eventData);
+    baseDir = Files.createTempDir();
+    if(baseDir.exists()) {
+      FileUtils.deleteDirectory(baseDir);
+    }
+    baseDir = Files.createTempDir();
+    origCheckpointDir = new File(baseDir, "chkpt");
+    Assert.assertTrue(origCheckpointDir.mkdirs() || 
origCheckpointDir.isDirectory());
+    origDataDir = new File(baseDir, "data");
+    Assert.assertTrue(origDataDir.mkdirs() || origDataDir.isDirectory());
+    FileChannel channel = new FileChannel();
+    channel.setName("channel");
+    ctx = new Context();
+    ctx.put(FileChannelConfiguration.CAPACITY, "1000");
+    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, 
origCheckpointDir.toString());
+    ctx.put(FileChannelConfiguration.DATA_DIRS, origDataDir.toString());
+    ctx.put(FileChannelConfiguration.MAX_FILE_SIZE, "10000");
+    ctx.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "100");
+    channel.configure(ctx);
+    channel.start();
+    for (int j = 0; j < 5; j++) {
+      Transaction tx = channel.getTransaction();
+      tx.begin();
+      for (int i = 0; i < 5; i++) {
+        channel.put(event);
+      }
+      tx.commit();
+      tx.close();
+    }
+    Log log = field("log")
+      .ofType(Log.class)
+      .in(channel)
+      .get();
+
+    Assert.assertTrue("writeCheckpoint returned false",
+      method("writeCheckpoint")
+        .withReturnType(Boolean.class)
+        .withParameterTypes(Boolean.class)
+        .in(log)
+        .invoke(true));
+    channel.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9982f5d..8026936 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ limitations under the License.
     <module>flume-ng-clients</module>
     <module>flume-ng-sdk</module>
     <module>flume-ng-tests</module>
+    <module>flume-tools</module>
   </modules>
 
   <profiles>
@@ -888,6 +889,12 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume</groupId>
+        <artifactId>flume-tools</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
         <artifactId>flume-ng-node</artifactId>
         <version>1.4.0-SNAPSHOT</version>
       </dependency>

Reply via email to