Updated Branches: refs/heads/trunk c72a3b1a5 -> 4aaf4a3f8
FLUME-1986: doTestInflightCorrupts should not commit transactions (Hari Shreedharan via Juhani Connolly) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4aaf4a3f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4aaf4a3f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4aaf4a3f Branch: refs/heads/trunk Commit: 4aaf4a3f86d8ede60dc85a69e8131d2bd8581fca Parents: c72a3b1 Author: Juhani Connolly <[email protected]> Authored: Mon Apr 15 11:30:31 2013 +0900 Committer: Juhani Connolly <[email protected]> Committed: Mon Apr 15 11:30:31 2013 +0900 ---------------------------------------------------------------------- .../flume/channel/file/TestFileChannelRestart.java | 35 +++++++++++--- .../org/apache/flume/channel/file/TestUtils.java | 2 - 2 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4aaf4a3f/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index fb0e208..dc6fc45 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.flume.Transaction; import org.apache.flume.channel.file.proto.ProtosFactory; import org.fest.reflect.exception.ReflectionError; import org.junit.After; @@ -41,13 +42,16 @@ import java.io.RandomAccessFile; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.Executors; import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; import static org.apache.flume.channel.file.TestUtils.consumeChannel; import static org.apache.flume.channel.file.TestUtils.fillChannel; import static org.apache.flume.channel.file.TestUtils.forceCheckpoint; import static org.apache.flume.channel.file.TestUtils.putEvents; +import static org.apache.flume.channel.file.TestUtils.putWithoutCommit; import static org.apache.flume.channel.file.TestUtils.takeEvents; +import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit; import static org.fest.reflect.core.Reflection.*; public class TestFileChannelRestart extends TestFileChannelBase { @@ -411,22 +415,22 @@ public class TestFileChannelRestart extends TestFileChannelBase { @Test public void testCorruptInflightPuts() throws Exception { - doTestCorruptInflights("inflightPuts", false); + doTestCorruptInflights("inflightputs", false); } @Test public void testCorruptInflightPutsWithBackup() throws Exception { - doTestCorruptInflights("inflightPuts", true); + doTestCorruptInflights("inflightputs", true); } @Test public void testCorruptInflightTakes() throws Exception { - doTestCorruptInflights("inflightTakes", false); + doTestCorruptInflights("inflighttakes", false); } @Test public void testCorruptInflightTakesWithBackup() throws Exception { - doTestCorruptInflights("inflightTakes", true); + doTestCorruptInflights("inflighttakes", true); } @Test @@ -489,12 +493,25 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = putEvents(channel, "restart", 10, 100); - Assert.assertEquals(100, in.size()); + final Set<String> in1 = putEvents(channel, "restart-",10, 100); + Assert.assertEquals(100, in1.size()); + Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { + @Override + public void run() { + Transaction tx = channel.getTransaction(); + Set<String> out1 = takeWithoutCommit(channel, tx, 100); + Assert.assertEquals(100, out1.size()); + } + }); + Transaction tx = channel.getTransaction(); + Set<String> in2 = putWithoutCommit(channel, tx, "restart", 100); + Assert.assertEquals(100, in2.size()); forceCheckpoint(channel); if(backup) { Thread.sleep(2000); } + tx.commit(); + tx.close(); channel.stop(); File inflight = new File(checkpointDir, name); RandomAccessFile writer = new RandomAccessFile(inflight, "rw"); @@ -505,7 +522,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); - compareInputAndOut(in, out); + in1.addAll(in2); + compareInputAndOut(in1, out); } @Test @@ -633,8 +651,9 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); + Thread.sleep(5000); forceCheckpoint(channel); - Thread.sleep(2000); + Thread.sleep(5000); in = putEvents(channel, "restart", 10, 100); takeEvents(channel, 10, 100); Assert.assertEquals(100, in.size()); http://git-wip-us.apache.org/repos/asf/flume/blob/4aaf4a3f/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 7c490b5..563dbcc 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -270,8 +270,6 @@ public class TestUtils { context.put(FileChannelConfiguration.DATA_DIRS, dataDir); context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1)); context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); - // Set checkpoint for 5 seconds otherwise test will run out of memory - context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000"); context.putAll(overrides); return context; }
