Updated Branches: refs/heads/flume-1.3.0 fff0372f2 -> 016c93a36
FLUME-1560. FileChannel tests which fill up the channel should use larger batch size than 1 (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/016c93a3 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/016c93a3 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/016c93a3 Branch: refs/heads/flume-1.3.0 Commit: 016c93a36c697eb567578899e1690e71ab2592cf Parents: fff0372 Author: Hari Shreedharan <[email protected]> Authored: Mon Sep 10 19:31:14 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Sep 10 19:32:17 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/TestFileChannel.java | 34 +------- .../flume/channel/file/TestFileChannelRestart.java | 14 +--- .../org/apache/flume/channel/file/TestUtils.java | 60 ++++++++++++++- .../file/encryption/TestFileChannelEncryption.java | 44 ++--------- 4 files changed, 69 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/016c93a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 16157d5..8baf8fe 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -219,13 +219,7 @@ public class TestFileChannel extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - try { - putEvents(channel, "fillup", 1, Integer.MAX_VALUE); - Assert.fail(); - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); - } + fillChannel(channel, "fillup"); // take an event, roll it back, and // then make sure a put fails Transaction transaction; @@ -236,13 +230,7 @@ public class TestFileChannel extends TestFileChannelBase { transaction.rollback(); transaction.close(); // ensure the take the didn't change the state of the capacity - try { - putEvents(channel, "capacity", 1, 1); - Assert.fail(); - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); - } + Assert.assertEquals(0, fillChannel(channel, "capacity").size()); // ensure we the events back Assert.assertEquals(5, takeEvents(channel, 1, 5).size()); } @@ -267,13 +255,7 @@ public class TestFileChannel extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - try { - putEvents(channel, "fillup", 1, Integer.MAX_VALUE); - Assert.fail(); - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); - } + fillChannel(channel, "fillup"); // then do a put which will block but it will be assigned a tx id Future<String> put = Executors.newSingleThreadExecutor() .submit(new Callable<String>() { @@ -395,15 +377,7 @@ public class TestFileChannel extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while (true) { - in.addAll(putEvents(channel, "restart", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - + channel.getName() + "]", e.getMessage()); - } + Set<String> in = fillChannel(channel, "restart"); Set<String> out = Sets.newHashSet(); // now take one item off the channel Transaction tx = channel.getTransaction(); http://git-wip-us.apache.org/repos/asf/flume/blob/016c93a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 90d5aed..4133573 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -24,7 +24,6 @@ import java.io.File; import java.util.Map; import java.util.Set; -import org.apache.flume.ChannelException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -33,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; public class TestFileChannelRestart extends TestFileChannelBase { protected static final Logger LOG = LoggerFactory @@ -89,15 +87,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while(true) { - in.addAll(putEvents(channel, "restart", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } + Set<String> in = fillChannel(channel, "restart"); if (forceCheckpoint) { forceCheckpoint(channel); } @@ -111,7 +101,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test http://git-wip-us.apache.org/repos/asf/flume/blob/016c93a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 8a9f10f..2b88b96 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -35,6 +35,7 @@ import java.util.UUID; import java.util.zip.GZIPInputStream; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Transaction; @@ -73,6 +74,7 @@ public class TestUtils { public static void compareInputAndOut(Set<String> in, Set<String> out) { Assert.assertNotNull(in); Assert.assertNotNull(out); + Assert.assertEquals(in.size(), out.size()); Assert.assertTrue(in.equals(out)); } @@ -148,23 +150,74 @@ public class TestUtils { } return result; } - + public static Set<String> consumeChannel(Channel channel) + throws Exception { + Set<String> result = Sets.newHashSet(); + int[] batchSizes = new int[] { + 1000, 100, 10, 1 + }; + for (int i = 0; i < batchSizes.length; i++) { + while(true) { + Set<String> batch = takeEvents(channel, batchSizes[i]); + if(batch.isEmpty()) { + break; + } + result.addAll(batch); + } + } + return result; + } + public static Set<String> fillChannel(Channel channel, String prefix) + throws Exception { + Set<String> result = Sets.newHashSet(); + int[] batchSizes = new int[] { + 1000, 100, 10, 1 + }; + for (int i = 0; i < batchSizes.length; i++) { + try { + while(true) { + Set<String> batch = putEvents(channel, prefix, batchSizes[i], + Integer.MAX_VALUE, true); + if(batch.isEmpty()) { + break; + } + result.addAll(batch); + } + } catch (ChannelException e) { + Assert.assertEquals("Cannot acquire capacity. [channel=" + +channel.getName()+"]", e.getMessage()); + } + } + return result; + } public static Set<String> putEvents(Channel channel, String prefix, - int batchSize, int numEvents) throws Exception { + int batchSize, int numEvents) throws Exception { + return putEvents(channel, prefix, batchSize, numEvents, false); + } + public static Set<String> putEvents(Channel channel, String prefix, + int batchSize, int numEvents, boolean untilCapacityIsReached) + throws Exception { Set<String> result = Sets.newHashSet(); for (int i = 0; i < numEvents; i += batchSize) { Transaction transaction = channel.getTransaction(); transaction.begin(); try { + Set<String> batch = Sets.newHashSet(); for (int j = 0; j < batchSize; j++) { String s = prefix + "-" + i + "-" + j + "-" + UUID.randomUUID(); Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8)); - result.add(s); channel.put(event); + batch.add(s); } transaction.commit(); + result.addAll(batch); } catch (Exception ex) { transaction.rollback(); + if(untilCapacityIsReached && ex instanceof ChannelException && + ("Cannot acquire capacity. [channel=" +channel.getName() + "]"). + equals(ex.getMessage())) { + break; + } throw ex; } finally { transaction.close(); @@ -185,6 +238,7 @@ public class TestUtils { context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir); context.put(FileChannelConfiguration.DATA_DIRS, dataDir); + context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1)); context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); // Set checkpoint for 5 seconds otherwise test will run out of memory context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000"); http://git-wip-us.apache.org/repos/asf/flume/blob/016c93a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java index 493dac7..5f3a23d 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.io.Files; public class TestFileChannelEncryption extends TestFileChannelBase { @@ -90,21 +89,13 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while(true) { - in.addAll(putEvents(channel, "restart", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } + Set<String> in = fillChannel(channel, "restart"); channel.stop(); channel = TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), dataDir, overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test @@ -113,15 +104,7 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while(true) { - in.addAll(putEvents(channel, "will-not-restart", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } + fillChannel(channel, "will-not-restart"); channel.stop(); Map<String, String> noEncryptionOverrides = getOverrides(); channel = createFileChannel(noEncryptionOverrides); @@ -142,15 +125,7 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = createFileChannel(noEncryptionOverrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while(true) { - in.addAll(putEvents(channel, "unencrypted-and-encrypted", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } + Set<String> in = fillChannel(channel, "unencrypted-and-encrypted"); int numEventsToRemove = in.size() / 2; for (int i = 0; i < numEventsToRemove; i++) { Assert.assertTrue(in.removeAll(takeEvents(channel, 1, 1))); @@ -161,15 +136,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - try { - while(true) { - in.addAll(putEvents(channel, "unencrypted-and-encrypted", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } - Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + in.addAll(fillChannel(channel, "unencrypted-and-encrypted")); + Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test
