Updated Branches:
  refs/heads/flume-1.4 5b9d31f1a -> e607a3023

FLUME-1986: doTestInflightCorrupts should not commit transactions

(Hari Shreedharan via Juhani Connolly)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e607a302
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e607a302
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e607a302

Branch: refs/heads/flume-1.4
Commit: e607a302375c7fdeb39be8e9f1e645c699e42498
Parents: 5b9d31f
Author: Juhani Connolly <[email protected]>
Authored: Mon Apr 15 11:29:09 2013 +0900
Committer: Juhani Connolly <[email protected]>
Committed: Mon Apr 15 11:29:09 2013 +0900

----------------------------------------------------------------------
 .../flume/channel/file/TestFileChannelRestart.java |   35 +++++++++++---
 .../org/apache/flume/channel/file/TestUtils.java   |    2 -
 2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e607a302/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index fb0e208..dc6fc45 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.fest.reflect.exception.ReflectionError;
 import org.junit.After;
@@ -41,13 +42,16 @@ import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Executors;
 
 import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
 import static org.apache.flume.channel.file.TestUtils.consumeChannel;
 import static org.apache.flume.channel.file.TestUtils.fillChannel;
 import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
 import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.putWithoutCommit;
 import static org.apache.flume.channel.file.TestUtils.takeEvents;
+import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit;
 import static org.fest.reflect.core.Reflection.*;
 
 public class TestFileChannelRestart extends TestFileChannelBase {
@@ -411,22 +415,22 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
 
   @Test
   public void testCorruptInflightPuts() throws Exception {
-    doTestCorruptInflights("inflightPuts", false);
+    doTestCorruptInflights("inflightputs", false);
   }
 
   @Test
   public void testCorruptInflightPutsWithBackup() throws Exception {
-    doTestCorruptInflights("inflightPuts", true);
+    doTestCorruptInflights("inflightputs", true);
   }
 
   @Test
   public void testCorruptInflightTakes() throws Exception {
-    doTestCorruptInflights("inflightTakes", false);
+    doTestCorruptInflights("inflighttakes", false);
   }
 
   @Test
   public void testCorruptInflightTakesWithBackup() throws Exception {
-    doTestCorruptInflights("inflightTakes", true);
+    doTestCorruptInflights("inflighttakes", true);
   }
 
   @Test
@@ -489,12 +493,25 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = putEvents(channel, "restart", 10, 100);
-    Assert.assertEquals(100, in.size());
+    final Set<String> in1 = putEvents(channel, "restart-",10, 100);
+    Assert.assertEquals(100, in1.size());
+    Executors.newSingleThreadScheduledExecutor().submit(new Runnable() {
+      @Override
+      public void run() {
+        Transaction tx = channel.getTransaction();
+        Set<String> out1 = takeWithoutCommit(channel, tx, 100);
+        Assert.assertEquals(100, out1.size());
+      }
+    });
+    Transaction tx = channel.getTransaction();
+    Set<String> in2 = putWithoutCommit(channel, tx, "restart", 100);
+    Assert.assertEquals(100, in2.size());
     forceCheckpoint(channel);
     if(backup) {
       Thread.sleep(2000);
     }
+    tx.commit();
+    tx.close();
     channel.stop();
     File inflight = new File(checkpointDir, name);
     RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
@@ -505,7 +522,8 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Assert.assertTrue(!backup || channel.checkpointBackupRestored());
     Set<String> out = consumeChannel(channel);
-    compareInputAndOut(in, out);
+    in1.addAll(in2);
+    compareInputAndOut(in1, out);
   }
 
   @Test
@@ -633,8 +651,9 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
+    Thread.sleep(5000);
     forceCheckpoint(channel);
-    Thread.sleep(2000);
+    Thread.sleep(5000);
     in = putEvents(channel, "restart", 10, 100);
     takeEvents(channel, 10, 100);
     Assert.assertEquals(100, in.size());

http://git-wip-us.apache.org/repos/asf/flume/blob/e607a302/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 7c490b5..563dbcc 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -270,8 +270,6 @@ public class TestUtils {
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
     context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1));
     context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
-    // Set checkpoint for 5 seconds otherwise test will run out of memory
-    context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");
     context.putAll(overrides);
     return context;
   }

Reply via email to