FLUME-1496: TestFileChannel is bloated (Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf44ac05 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf44ac05 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf44ac05 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: cf44ac05aa197304e0077328b1c35d9b6a4a0e20 Parents: e2c6b56 Author: Brock Noland <[email protected]> Authored: Thu Aug 30 11:48:53 2012 -0500 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:06 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/TestFileChannel.java | 365 +++------------ .../org/apache/flume/channel/file/TestUtils.java | 116 +++++- 2 files changed, 188 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/cf44ac05/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 3e01395..0fd3176 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -17,7 +17,6 @@ * under the License. */ package org.apache.flume.channel.file; -import static org.fest.reflect.core.Reflection.*; import java.io.File; import java.io.FileOutputStream; @@ -40,7 +39,6 @@ import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.commons.io.FileUtils; -import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; @@ -57,12 +55,15 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.Resources; +import java.util.HashSet; +import java.util.Iterator; + +import static org.apache.flume.channel.file.TestUtils.*; public class TestFileChannel { @@ -124,22 +125,17 @@ public class TestFileChannel { public void testFailAfterTakeBeforeCommit() throws Throwable { final FileChannel channel = createFileChannel(); channel.start(); - final Set<String> eventSet = Sets.newHashSet(); - eventSet.addAll(putEvents(channel, "testTakeFailBeforeCommit", 5, 5)); + final Set<String> eventSet = + putEvents(channel, "testTakeFailBeforeCommit", 5, 5); Transaction tx = channel.getTransaction(); - tx.begin(); - channel.take(); - channel.take(); + takeWithoutCommit(channel, tx, 2); //Simulate multiple sources, so separate thread - txns are thread local, //so a new txn wont be created here unless it is in a different thread. Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { Transaction tx = channel.getTransaction(); - tx.begin(); - channel.take(); - channel.take(); - channel.take(); + takeWithoutCommit(channel, tx, 3); } }).get(); forceCheckpoint(channel); @@ -151,61 +147,29 @@ public class TestFileChannel { public void run() { FileChannel channel = createFileChannel(); channel.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e; - /* - * Explicitly not put in a loop, so it is easy to find out which - * event became null easily. - */ - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(eventSet.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(eventSet.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(eventSet.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(eventSet.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(eventSet.remove(new String(e.getBody()))); - tx.commit(); - tx.close(); + Set<String> output = null; + try { + output = takeEvents(channel, 5); + } catch (Exception e) { + Throwables.propagate(e); + } + compareInputAndOut(eventSet, output); channel.stop(); } }).get(); } catch (ExecutionException e) { throw e.getCause(); } - } @Test public void testFailAfterPutCheckpointCommit() throws Throwable { - final Set<String> set = Sets.newHashSet(); final Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); final FileChannel channel = createFileChannel(overrides); channel.start(); Transaction tx = channel.getTransaction(); - //Initially commit a put to make sure checkpoint is required. - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'1', '2'})); - set.add(new String(new byte[]{'1', '2'})); - tx.commit(); - tx.close(); - tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'a', 'b'})); - set.add(new String(new byte[]{'a', 'b'})); - channel.put(EventBuilder.withBody(new byte[]{'c', 'd'})); - set.add(new String(new byte[]{'c', 'd'})); - channel.put(EventBuilder.withBody(new byte[]{'e', 'f'})); - set.add(new String(new byte[]{'e', 'f'})); + final Set<String> input = putWithoutCommit(channel, tx, "failAfterPut", 3); //Simulate multiple sources, so separate thread - txns are thread local, //so a new txn wont be created here unless it is in a different thread. final CountDownLatch latch = new CountDownLatch(1); @@ -214,14 +178,7 @@ public class TestFileChannel { @Override public void run() { Transaction tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'3', '4'})); - channel.put(EventBuilder.withBody(new byte[]{'5', '6'})); - channel.put(EventBuilder.withBody(new byte[]{'7', '8'})); - set.add(new String(new byte[]{'3', '4'})); - set.add(new String(new byte[]{'5', '6'})); - set.add(new String(new byte[]{'7', '8'})); - + input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3)); try { latch.await(); tx.commit(); @@ -240,39 +197,20 @@ public class TestFileChannel { Thread.sleep(2000); channel.stop(); + final Set<String> out = Sets.newHashSet(); //Simulate a sink, so separate thread. try { Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { - FileChannel channel = createFileChannel(); - channel.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.remove(new String(e.getBody()))); - tx.commit(); - tx.close(); - channel.stop(); + try { + FileChannel channel = createFileChannel(); + channel.start(); + out.addAll(takeEvents(channel, 6)); + channel.stop(); + } catch (Exception ex) { + Throwables.propagate(ex); + } } }).get(); } catch (ExecutionException e) { @@ -296,7 +234,7 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> in = Lists.newArrayList(); + Set<String> in = Sets.newHashSet(); try { while(true) { in.addAll(putEvents(channel, "restart", 1, 1)); @@ -309,30 +247,14 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); - Collections.sort(in); - Collections.sort(out); - if(!out.equals(in)) { - List<String> difference = new ArrayList<String>(); - if(in.size() > out.size()) { - LOG.info("The channel shorted us"); - difference.addAll(in); - difference.removeAll(out); - } else { - LOG.info("We got more events than expected, perhaps dups"); - difference.addAll(out); - difference.removeAll(in); - } - LOG.error("difference = " + difference + - ", in.size = " + in.size() + ", out.size = " + out.size()); - Assert.fail(); - } + Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + compareInputAndOut(in, out); } @Test public void testReconfigure() throws Exception { channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> in = Lists.newArrayList(); + Set<String> in = Sets.newHashSet(); try { while(true) { in.addAll(putEvents(channel, "reconfig", 1, 1)); @@ -342,24 +264,8 @@ public class TestFileChannel { +channel.getName()+"]", e.getMessage()); } Configurables.configure(channel, createContext()); - List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); - Collections.sort(in); - Collections.sort(out); - if(!out.equals(in)) { - List<String> difference = new ArrayList<String>(); - if(in.size() > out.size()) { - LOG.info("The channel shorted us"); - difference.addAll(in); - difference.removeAll(out); - } else { - LOG.info("We got more events than expected, perhaps dups"); - difference.addAll(out); - difference.removeAll(in); - } - LOG.error("difference = " + difference + - ", in.size = " + in.size() + ", out.size = " + out.size()); - Assert.fail(); - } + Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + compareInputAndOut(in, out); } @Test public void testPut() throws Exception { @@ -368,13 +274,11 @@ public class TestFileChannel { // should find no items int found = takeEvents(channel, 1, 5).size(); Assert.assertEquals(0, found); - List<String> expected = Lists.newArrayList(); + Set<String> expected = Sets.newHashSet(); expected.addAll(putEvents(channel, "unbatched", 1, 5)); expected.addAll(putEvents(channel, "batched", 5, 5)); - List<String> actual = takeEvents(channel, 1); - Collections.sort(actual); - Collections.sort(expected); - Assert.assertEquals(expected, actual); + Set<String> actual = takeEvents(channel, 1); + compareInputAndOut(expected, actual); } @Test public void testRollbackAfterNoPutTake() throws Exception { @@ -485,18 +389,19 @@ public class TestFileChannel { .submit(new Callable<String>() { @Override public String call() throws Exception { - List<String> result = putEvents(channel, "blocked-put", 1, 1); + Set<String> result = putEvents(channel, "blocked-put", 1, 1); Assert.assertTrue(result.toString(), result.size() == 1); - return result.remove(0); + Iterator<String> iter = result.iterator(); + return iter.next(); } }); Thread.sleep(1000L); // ensure the put has started and is blocked // after which we do a take, will have a tx id after the put - List<String> result = takeEvents(channel, 1, 1); + Set<String> result = takeEvents(channel, 1, 1); Assert.assertTrue(result.toString(), result.size() == 1); String putmsg = put.get(); Assert.assertNotNull(putmsg); - String takemsg = result.remove(0); + String takemsg = result.iterator().next(); Assert.assertNotNull(takemsg); LOG.info("Got: put " + putmsg + ", take " + takemsg); channel.stop(); @@ -511,7 +416,7 @@ public class TestFileChannel { channel.start(); Assert.assertTrue(channel.isOpen()); int numEvents = 50; - List<String> in = putEvents(channel, "rollback", 1, numEvents); + Set<String> in = putEvents(channel, "rollback", 1, numEvents); Transaction transaction; // put an item we will rollback @@ -528,10 +433,8 @@ public class TestFileChannel { Assert.assertTrue(channel.isOpen()); // we should not get the rolled back item - List<String> out = takeEvents(channel, 1, numEvents); - Collections.sort(in); - Collections.sort(out); - Assert.assertEquals(in, out); + Set<String> out = takeEvents(channel, 1, numEvents); + compareInputAndOut(in, out); } @Test public void testRollbackSimulatedCrashWithSink() throws Exception { @@ -569,9 +472,9 @@ public class TestFileChannel { channel = createFileChannel(); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> out = takeEvents(channel, 1, 1); + Set<String> out = takeEvents(channel, 1, 1); Assert.assertEquals(1, out.size()); - String s = out.get(0); + String s = out.iterator().next(); Assert.assertTrue(s, s.startsWith("rollback-90-9")); } @Test @@ -583,10 +486,10 @@ public class TestFileChannel { final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads); final List<Exception> errors = Collections .synchronizedList(new ArrayList<Exception>()); - final List<String> expected = Collections - .synchronizedList(new ArrayList<String>()); - final List<String> actual = Collections - .synchronizedList(new ArrayList<String>()); + final Set<String> expected = Collections.synchronizedSet( + new HashSet<String>()); + final Set<String> actual = Collections.synchronizedSet( + new HashSet<String>()); for (int i = 0; i < numThreads; i++) { final int id = i; Thread t = new Thread() { @@ -645,9 +548,7 @@ public class TestFileChannel { Assert.assertTrue("Timed out waiting for consumer", consumerStopLatch.await(30, TimeUnit.SECONDS)); Assert.assertEquals(Collections.EMPTY_LIST, errors); - Collections.sort(expected); - Collections.sort(actual); - Assert.assertEquals(expected, actual); + compareInputAndOut(expected, actual); } @Test public void testLocking() throws IOException { @@ -676,12 +577,14 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> events = takeEvents(channel, 1); - List<String> expected = Arrays.asList(new String[] { + Set<String> events = takeEvents(channel, 1); + Set<String> expected = new HashSet<String>(); + expected.addAll(Arrays.asList( + (new String[]{ "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691" - } - ); - Assert.assertEquals(expected, events); + }))); + compareInputAndOut(expected, events); + } /** * This is a regression test with files generated by a file channel @@ -713,7 +616,7 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> events = takeEvents(channel, 1); + Set<String> events = takeEvents(channel, 1); Assert.assertEquals(50, events.size()); } @@ -728,7 +631,7 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - List<String> in = Lists.newArrayList(); + Set<String> in = Sets.newHashSet(); try { while (true) { in.addAll(putEvents(channel, "restart", 1, 1)); @@ -737,15 +640,10 @@ public class TestFileChannel { Assert.assertEquals("Cannot acquire capacity. [channel=" + channel.getName() + "]", e.getMessage()); } - List<String> out = Lists.newArrayList(); + Set<String> out = Sets.newHashSet(); // now take one item off the channel Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = channel.take(); - Assert.assertNotNull(e); - String s = new String(e.getBody(), Charsets.UTF_8); - out.add(s); - LOG.info("Slow take got " + s); + out.addAll(takeWithoutCommit(channel, tx, 1)); // sleep so a checkpoint occurs. take is before // and commit is after the checkpoint forceCheckpoint(channel); @@ -756,25 +654,10 @@ public class TestFileChannel { channel.start(); Assert.assertTrue(channel.isOpen()); // we should not geet the item we took of the queue above - out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE)); + Set<String> out2 = takeEvents(channel, 1, Integer.MAX_VALUE); channel.stop(); - Collections.sort(in); - Collections.sort(out); - if (!out.equals(in)) { - List<String> difference = new ArrayList<String>(); - if (in.size() > out.size()) { - LOG.info("The channel shorted us"); - difference.addAll(in); - difference.removeAll(out); - } else { - LOG.info("We got more events than expected, perhaps dups"); - difference.addAll(out); - difference.removeAll(in); - } - LOG.error("difference = " + difference - + ", in.size = " + in.size() + ", out.size = " + out.size()); - Assert.fail(); - } + in.removeAll(out); + compareInputAndOut(in, out2); } @Test @@ -787,15 +670,7 @@ public class TestFileChannel { channel.start(); //Force a checkpoint by committing a transaction Transaction tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'a','b'})); - set.add(new String(new byte[]{'a','b'})); - tx.commit(); - tx.close(); - tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'c','d'})); - set.add(new String(new byte[]{'c', 'd'})); + Set<String> in = putWithoutCommit(channel, tx, "putWithoutCommit", 1); forceCheckpoint(channel); tx.commit(); tx.close(); @@ -804,23 +679,14 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - tx = channel.getTransaction(); - tx.begin(); - Event e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.contains(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.contains(new String(e.getBody()))); - tx.commit(); - tx.close(); + Set<String> out = takeEvents(channel, 1); + compareInputAndOut(in, out); channel.stop(); } @Test public void testPutCheckpointCommitCheckpointReplay() throws Exception { - Set<String> set = Sets.newHashSet(); Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); @@ -828,15 +694,7 @@ public class TestFileChannel { channel.start(); //Force a checkpoint by committing a transaction Transaction tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'a','b'})); - set.add(new String(new byte[]{'a','b'})); - tx.commit(); - tx.close(); - tx = channel.getTransaction(); - tx.begin(); - channel.put(EventBuilder.withBody(new byte[]{'c', 'd'})); - set.add(new String(new byte[]{'c','d'})); + Set<String> in = putWithoutCommit(channel, tx, "doubleCheckpoint", 1); forceCheckpoint(channel); tx.commit(); tx.close(); @@ -846,39 +704,26 @@ public class TestFileChannel { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - tx = channel.getTransaction(); - tx.begin(); - Event e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.contains(new String(e.getBody()))); - e = channel.take(); - Assert.assertNotNull(e); - Assert.assertTrue(set.contains(new String(e.getBody()))); - tx.commit(); - tx.close(); + Set<String> out = takeEvents(channel, 5); + compareInputAndOut(in, out); channel.stop(); } @Test public void testReferenceCounts() throws Exception { - Set<String> set = Sets.newHashSet(); Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20"); final FileChannel channel = createFileChannel(overrides); channel.start(); - List<String> in = putEvents(channel, "testing-reference-counting", 1, 15); + Set<String> in = putEvents(channel, "testing-reference-counting", 1, 15); Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 10; i++) { - channel.take(); - } - + takeWithoutCommit(channel, tx, 10); forceCheckpoint(channel); tx.rollback(); //Since we did not commit the original transaction. now we should get 15 //events back. - final List<String> takenEvents = Lists.newArrayList(); + final Set<String> takenEvents = Sets.newHashSet(); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { @@ -892,19 +737,7 @@ public class TestFileChannel { Assert.assertEquals(15, takenEvents.size()); } - private static void forceCheckpoint(FileChannel channel) { - Log log = field("log") - .ofType(Log.class) - .in(channel) - .get(); - Assert.assertTrue("writeCheckpoint returned false", - method("writeCheckpoint") - .withReturnType(Boolean.class) - .withParameterTypes(Boolean.class) - .in(log) - .invoke(true)); - } private static void copyDecompressed(String resource, File output) throws IOException { URL input = Resources.getResource(resource); @@ -913,56 +746,4 @@ public class TestFileChannel { LOG.info("Copied " + copied + " bytes from " + input + " to " + output); } - private static List<String> takeEvents(Channel channel, - int batchSize) throws Exception { - return takeEvents(channel, batchSize, Integer.MAX_VALUE); - } - private static List<String> takeEvents(Channel channel, - int batchSize, int numEvents) throws Exception { - List<String> result = Lists.newArrayList(); - for (int i = 0; i < numEvents; i += batchSize) { - for (int j = 0; j < batchSize; j++) { - Transaction transaction = channel.getTransaction(); - transaction.begin(); - try { - Event event = channel.take(); - if(event == null) { - transaction.commit(); - return result; - } - result.add(new String(event.getBody(), Charsets.UTF_8)); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } finally { - transaction.close(); - } - } - } - return result; - } - private static List<String> putEvents(Channel channel, String prefix, - int batchSize, int numEvents) throws Exception { - List<String> result = Lists.newArrayList(); - for (int i = 0; i < numEvents; i += batchSize) { - for (int j = 0; j < batchSize; j++) { - Transaction transaction = channel.getTransaction(); - transaction.begin(); - try { - String s = prefix + "-" + i +"-" + j + "-" + UUID.randomUUID(); - Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8)); - result.add(s); - channel.put(event); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } finally { - transaction.close(); - } - } - } - return result; - } } http://git-wip-us.apache.org/repos/asf/flume/blob/cf44ac05/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 3474f2d..e64f856 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Charsets; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -29,6 +30,15 @@ import java.util.Map; import org.apache.hadoop.io.Writable; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Set; +import java.util.UUID; +import org.apache.flume.Channel; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.event.EventBuilder; +import org.junit.Assert; +import static org.fest.reflect.core.Reflection.*; public class TestUtils { @@ -37,9 +47,10 @@ public class TestUtils { String timestamp = String.valueOf(System.currentTimeMillis()); headers.put("timestamp", timestamp); FlumeEvent event = new FlumeEvent(headers, - timestamp.getBytes()); + timestamp.getBytes()); return event; } + public static DataInput toDataInput(Writable writable) throws IOException { ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); DataOutputStream dataOutput = new DataOutputStream(byteOutput); @@ -48,4 +59,107 @@ public class TestUtils { DataInputStream dataInput = new DataInputStream(byteInput); return dataInput; } + + public static void compareInputAndOut(Set<String> in, Set<String> out) { + Assert.assertNotNull(in); + Assert.assertNotNull(out); + Assert.assertTrue(in.equals(out)); + } + + public static Set<String> putWithoutCommit(Channel channel, Transaction tx, + String prefix, int number) { + Set<String> events = Sets.newHashSet(); + tx.begin(); + for (int i = 0; i < number; i++) { + String eventData = (prefix + UUID.randomUUID()).toString(); + Event event = EventBuilder.withBody(eventData.getBytes()); + channel.put(event); + events.add(eventData); + } + return events; + } + + public static Set<String> takeWithoutCommit(Channel channel, Transaction tx, + int number) { + Set<String> events = Sets.newHashSet(); + tx.begin(); + for (int i = 0; i < number; i++) { + Event e = channel.take(); + if (e == null) { + break; + } + events.add(new String(e.getBody())); + } + return events; + } + + public static void forceCheckpoint(FileChannel channel) { + Log log = field("log") + .ofType(Log.class) + .in(channel) + .get(); + + Assert.assertTrue("writeCheckpoint returned false", + method("writeCheckpoint") + .withReturnType(Boolean.class) + .withParameterTypes(Boolean.class) + .in(log) + .invoke(true)); + } + + public static Set<String> takeEvents(Channel channel, + int batchSize) throws Exception { + return takeEvents(channel, batchSize, Integer.MAX_VALUE); + } + + public static Set<String> takeEvents(Channel channel, + int batchSize, int numEvents) throws Exception { + Set<String> result = Sets.newHashSet(); + for (int i = 0; i < numEvents; i += batchSize) { + Transaction transaction = channel.getTransaction(); + try { + transaction.begin(); + for (int j = 0; j < batchSize; j++) { + Event event = channel.take(); + if (event == null) { + transaction.commit(); + return result; + } + result.add(new String(event.getBody(), Charsets.UTF_8)); + } + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } finally { + transaction.close(); + } + + } + return result; + } + + public static Set<String> putEvents(Channel channel, String prefix, + int batchSize, int numEvents) throws Exception { + Set<String> result = Sets.newHashSet(); + for (int i = 0; i < numEvents; i += batchSize) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + try { + for (int j = 0; j < batchSize; j++) { + String s = prefix + "-" + i + "-" + j + "-" + UUID.randomUUID(); + Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8)); + result.add(s); + channel.put(event); + } + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } finally { + transaction.close(); + } + } + return result; + } }
