FLUME-1496: TestFileChannel is bloated

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf44ac05
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf44ac05
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf44ac05

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: cf44ac05aa197304e0077328b1c35d9b6a4a0e20
Parents: e2c6b56
Author: Brock Noland <[email protected]>
Authored: Thu Aug 30 11:48:53 2012 -0500
Committer: Mike Percy <[email protected]>
Committed: Fri Sep 7 14:03:06 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/TestFileChannel.java |  365 +++------------
 .../org/apache/flume/channel/file/TestUtils.java   |  116 +++++-
 2 files changed, 188 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cf44ac05/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 3e01395..0fd3176 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 package org.apache.flume.channel.file;
-import static org.fest.reflect.core.Reflection.*;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -40,7 +39,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -57,12 +55,15 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import static org.apache.flume.channel.file.TestUtils.*;
 
 public class TestFileChannel {
 
@@ -124,22 +125,17 @@ public class TestFileChannel {
   public void testFailAfterTakeBeforeCommit() throws Throwable {
     final FileChannel channel = createFileChannel();
     channel.start();
-    final Set<String> eventSet = Sets.newHashSet();
-    eventSet.addAll(putEvents(channel, "testTakeFailBeforeCommit", 5, 5));
+    final Set<String> eventSet =
+            putEvents(channel, "testTakeFailBeforeCommit", 5, 5);
     Transaction tx = channel.getTransaction();
-    tx.begin();
-    channel.take();
-    channel.take();
+    takeWithoutCommit(channel, tx, 2);
     //Simulate multiple sources, so separate thread - txns are thread local,
     //so a new txn wont be created here unless it is in a different thread.
     Executors.newSingleThreadExecutor().submit(new Runnable() {
       @Override
       public void run() {
         Transaction tx = channel.getTransaction();
-        tx.begin();
-        channel.take();
-        channel.take();
-        channel.take();
+        takeWithoutCommit(channel, tx, 3);
       }
     }).get();
     forceCheckpoint(channel);
@@ -151,61 +147,29 @@ public class TestFileChannel {
         public void run() {
           FileChannel channel = createFileChannel();
           channel.start();
-          Transaction tx = channel.getTransaction();
-          tx.begin();
-          Event e;
-          /*
-           * Explicitly not put in a loop, so it is easy to find out which
-           * event became null easily.
-           */
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
-          tx.commit();
-          tx.close();
+          Set<String> output = null;
+          try {
+            output = takeEvents(channel, 5);
+          } catch (Exception e) {
+            Throwables.propagate(e);
+          }
+          compareInputAndOut(eventSet, output);
           channel.stop();
         }
       }).get();
     } catch (ExecutionException e) {
       throw e.getCause();
     }
-
   }
 
   @Test
   public void testFailAfterPutCheckpointCommit() throws Throwable {
-    final Set<String> set = Sets.newHashSet();
     final Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
     final FileChannel channel = createFileChannel(overrides);
     channel.start();
     Transaction tx = channel.getTransaction();
-    //Initially commit a put to make sure checkpoint is required.
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'1', '2'}));
-    set.add(new String(new byte[]{'1', '2'}));
-    tx.commit();
-    tx.close();
-    tx = channel.getTransaction();
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'a', 'b'}));
-    set.add(new String(new byte[]{'a', 'b'}));
-    channel.put(EventBuilder.withBody(new byte[]{'c', 'd'}));
-    set.add(new String(new byte[]{'c', 'd'}));
-    channel.put(EventBuilder.withBody(new byte[]{'e', 'f'}));
-    set.add(new String(new byte[]{'e', 'f'}));
+    final Set<String> input = putWithoutCommit(channel, tx, "failAfterPut", 3);
     //Simulate multiple sources, so separate thread - txns are thread local,
     //so a new txn wont be created here unless it is in a different thread.
     final CountDownLatch latch = new CountDownLatch(1);
@@ -214,14 +178,7 @@ public class TestFileChannel {
               @Override
               public void run() {
                 Transaction tx = channel.getTransaction();
-                tx.begin();
-                channel.put(EventBuilder.withBody(new byte[]{'3', '4'}));
-                channel.put(EventBuilder.withBody(new byte[]{'5', '6'}));
-                channel.put(EventBuilder.withBody(new byte[]{'7', '8'}));
-                set.add(new String(new byte[]{'3', '4'}));
-                set.add(new String(new byte[]{'5', '6'}));
-                set.add(new String(new byte[]{'7', '8'}));
-
+                input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3));
                 try {
                   latch.await();
                   tx.commit();
@@ -240,39 +197,20 @@ public class TestFileChannel {
     Thread.sleep(2000);
     channel.stop();
 
+    final Set<String> out = Sets.newHashSet();
     //Simulate a sink, so separate thread.
     try {
       Executors.newSingleThreadExecutor().submit(new Runnable() {
         @Override
         public void run() {
-          FileChannel channel = createFileChannel();
-          channel.start();
-          Transaction tx = channel.getTransaction();
-          tx.begin();
-          Event e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          e = channel.take();
-          Assert.assertNotNull(e);
-          Assert.assertTrue(set.remove(new String(e.getBody())));
-          tx.commit();
-          tx.close();
-          channel.stop();
+          try {
+            FileChannel channel = createFileChannel();
+            channel.start();
+            out.addAll(takeEvents(channel, 6));
+            channel.stop();
+          } catch (Exception ex) {
+            Throwables.propagate(ex);
+          }
         }
       }).get();
     } catch (ExecutionException e) {
@@ -296,7 +234,7 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> in = Lists.newArrayList();
+    Set<String> in = Sets.newHashSet();
     try {
       while(true) {
         in.addAll(putEvents(channel, "restart", 1, 1));
@@ -309,30 +247,14 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
-    Collections.sort(in);
-    Collections.sort(out);
-    if(!out.equals(in)) {
-      List<String> difference = new ArrayList<String>();
-      if(in.size() > out.size()) {
-        LOG.info("The channel shorted us");
-        difference.addAll(in);
-        difference.removeAll(out);
-      } else {
-        LOG.info("We got more events than expected, perhaps dups");
-        difference.addAll(out);
-        difference.removeAll(in);
-      }
-      LOG.error("difference = " + difference +
-          ", in.size = " + in.size() + ", out.size = " + out.size());
-      Assert.fail();
-    }
+    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    compareInputAndOut(in, out);
   }
   @Test
   public void testReconfigure() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> in = Lists.newArrayList();
+    Set<String> in = Sets.newHashSet();
     try {
       while(true) {
         in.addAll(putEvents(channel, "reconfig", 1, 1));
@@ -342,24 +264,8 @@ public class TestFileChannel {
           +channel.getName()+"]", e.getMessage());
     }
     Configurables.configure(channel, createContext());
-    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
-    Collections.sort(in);
-    Collections.sort(out);
-    if(!out.equals(in)) {
-      List<String> difference = new ArrayList<String>();
-      if(in.size() > out.size()) {
-        LOG.info("The channel shorted us");
-        difference.addAll(in);
-        difference.removeAll(out);
-      } else {
-        LOG.info("We got more events than expected, perhaps dups");
-        difference.addAll(out);
-        difference.removeAll(in);
-      }
-      LOG.error("difference = " + difference +
-          ", in.size = " + in.size() + ", out.size = " + out.size());
-      Assert.fail();
-    }
+    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    compareInputAndOut(in, out);
   }
   @Test
   public void testPut() throws Exception {
@@ -368,13 +274,11 @@ public class TestFileChannel {
     // should find no items
     int found = takeEvents(channel, 1, 5).size();
     Assert.assertEquals(0, found);
-    List<String> expected = Lists.newArrayList();
+    Set<String> expected = Sets.newHashSet();
     expected.addAll(putEvents(channel, "unbatched", 1, 5));
     expected.addAll(putEvents(channel, "batched", 5, 5));
-    List<String> actual = takeEvents(channel, 1);
-    Collections.sort(actual);
-    Collections.sort(expected);
-    Assert.assertEquals(expected, actual);
+    Set<String> actual = takeEvents(channel, 1);
+    compareInputAndOut(expected, actual);
   }
   @Test
   public void testRollbackAfterNoPutTake() throws Exception {
@@ -485,18 +389,19 @@ public class TestFileChannel {
             .submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
-        List<String> result = putEvents(channel, "blocked-put", 1, 1);
+        Set<String> result = putEvents(channel, "blocked-put", 1, 1);
         Assert.assertTrue(result.toString(), result.size() == 1);
-        return result.remove(0);
+        Iterator<String> iter = result.iterator();
+        return iter.next();
       }
     });
     Thread.sleep(1000L); // ensure the put has started and is blocked
     // after which we do a take, will have a tx id after the put
-    List<String> result = takeEvents(channel, 1, 1);
+    Set<String> result = takeEvents(channel, 1, 1);
     Assert.assertTrue(result.toString(), result.size() == 1);
     String putmsg = put.get();
     Assert.assertNotNull(putmsg);
-    String takemsg = result.remove(0);
+    String takemsg = result.iterator().next();
     Assert.assertNotNull(takemsg);
     LOG.info("Got: put " + putmsg + ", take " + takemsg);
     channel.stop();
@@ -511,7 +416,7 @@ public class TestFileChannel {
     channel.start();
     Assert.assertTrue(channel.isOpen());
     int numEvents = 50;
-    List<String> in = putEvents(channel, "rollback", 1, numEvents);
+    Set<String> in = putEvents(channel, "rollback", 1, numEvents);
 
     Transaction transaction;
     // put an item we will rollback
@@ -528,10 +433,8 @@ public class TestFileChannel {
     Assert.assertTrue(channel.isOpen());
 
     // we should not get the rolled back item
-    List<String> out = takeEvents(channel, 1, numEvents);
-    Collections.sort(in);
-    Collections.sort(out);
-    Assert.assertEquals(in, out);
+    Set<String> out = takeEvents(channel, 1, numEvents);
+    compareInputAndOut(in, out);
   }
   @Test
   public void testRollbackSimulatedCrashWithSink() throws Exception {
@@ -569,9 +472,9 @@ public class TestFileChannel {
     channel = createFileChannel();
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> out = takeEvents(channel, 1, 1);
+    Set<String> out = takeEvents(channel, 1, 1);
     Assert.assertEquals(1, out.size());
-    String s = out.get(0);
+    String s = out.iterator().next();
     Assert.assertTrue(s, s.startsWith("rollback-90-9"));
   }
   @Test
@@ -583,10 +486,10 @@ public class TestFileChannel {
     final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
     final List<Exception> errors = Collections
             .synchronizedList(new ArrayList<Exception>());
-    final List<String> expected = Collections
-            .synchronizedList(new ArrayList<String>());
-    final List<String> actual = Collections
-            .synchronizedList(new ArrayList<String>());
+    final Set<String> expected = Collections.synchronizedSet(
+            new HashSet<String>());
+    final Set<String> actual = Collections.synchronizedSet(
+            new HashSet<String>());
     for (int i = 0; i < numThreads; i++) {
       final int id = i;
       Thread t = new Thread() {
@@ -645,9 +548,7 @@ public class TestFileChannel {
     Assert.assertTrue("Timed out waiting for consumer",
             consumerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertEquals(Collections.EMPTY_LIST, errors);
-    Collections.sort(expected);
-    Collections.sort(actual);
-    Assert.assertEquals(expected, actual);
+    compareInputAndOut(expected, actual);
   }
   @Test
   public void testLocking() throws IOException {
@@ -676,12 +577,14 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> events = takeEvents(channel, 1);
-    List<String> expected = Arrays.asList(new String[] {
+    Set<String> events = takeEvents(channel, 1);
+    Set<String> expected = new HashSet<String>();
+    expected.addAll(Arrays.asList(
+            (new String[]{
               "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691"
-        }
-    );
-    Assert.assertEquals(expected, events);
+            })));
+    compareInputAndOut(expected, events);
+
   }
   /**
    * This is a regression test with files generated by a file channel
@@ -713,7 +616,7 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> events = takeEvents(channel, 1);
+    Set<String> events = takeEvents(channel, 1);
     Assert.assertEquals(50, events.size());
   }
 
@@ -728,7 +631,7 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    List<String> in = Lists.newArrayList();
+    Set<String> in = Sets.newHashSet();
     try {
       while (true) {
         in.addAll(putEvents(channel, "restart", 1, 1));
@@ -737,15 +640,10 @@ public class TestFileChannel {
       Assert.assertEquals("Cannot acquire capacity. [channel="
               + channel.getName() + "]", e.getMessage());
     }
-    List<String> out = Lists.newArrayList();
+    Set<String> out = Sets.newHashSet();
     // now take one item off the channel
     Transaction tx = channel.getTransaction();
-    tx.begin();
-    Event e = channel.take();
-    Assert.assertNotNull(e);
-    String s = new String(e.getBody(), Charsets.UTF_8);
-    out.add(s);
-    LOG.info("Slow take got " + s);
+    out.addAll(takeWithoutCommit(channel, tx, 1));
     // sleep so a checkpoint occurs. take is before
     // and commit is after the checkpoint
     forceCheckpoint(channel);
@@ -756,25 +654,10 @@ public class TestFileChannel {
     channel.start();
     Assert.assertTrue(channel.isOpen());
     // we should not geet the item we took of the queue above
-    out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+    Set<String> out2 = takeEvents(channel, 1, Integer.MAX_VALUE);
     channel.stop();
-    Collections.sort(in);
-    Collections.sort(out);
-    if (!out.equals(in)) {
-      List<String> difference = new ArrayList<String>();
-      if (in.size() > out.size()) {
-        LOG.info("The channel shorted us");
-        difference.addAll(in);
-        difference.removeAll(out);
-      } else {
-        LOG.info("We got more events than expected, perhaps dups");
-        difference.addAll(out);
-        difference.removeAll(in);
-      }
-      LOG.error("difference = " + difference
-              + ", in.size = " + in.size() + ", out.size = " + out.size());
-      Assert.fail();
-    }
+    in.removeAll(out);
+    compareInputAndOut(in, out2);
   }
 
   @Test
@@ -787,15 +670,7 @@ public class TestFileChannel {
     channel.start();
     //Force a checkpoint by committing a transaction
     Transaction tx = channel.getTransaction();
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'a','b'}));
-    set.add(new String(new byte[]{'a','b'}));
-    tx.commit();
-    tx.close();
-    tx = channel.getTransaction();
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'c','d'}));
-    set.add(new String(new byte[]{'c', 'd'}));
+    Set<String> in = putWithoutCommit(channel, tx, "putWithoutCommit", 1);
     forceCheckpoint(channel);
     tx.commit();
     tx.close();
@@ -804,23 +679,14 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    tx = channel.getTransaction();
-    tx.begin();
-    Event e = channel.take();
-    Assert.assertNotNull(e);
-    Assert.assertTrue(set.contains(new String(e.getBody())));
-    e = channel.take();
-    Assert.assertNotNull(e);
-    Assert.assertTrue(set.contains(new String(e.getBody())));
-    tx.commit();
-    tx.close();
+    Set<String> out = takeEvents(channel, 1);
+    compareInputAndOut(in, out);
     channel.stop();
 
   }
 
   @Test
   public void testPutCheckpointCommitCheckpointReplay() throws Exception {
-    Set<String> set = Sets.newHashSet();
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
@@ -828,15 +694,7 @@ public class TestFileChannel {
     channel.start();
     //Force a checkpoint by committing a transaction
     Transaction tx = channel.getTransaction();
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'a','b'}));
-    set.add(new String(new byte[]{'a','b'}));
-    tx.commit();
-    tx.close();
-    tx = channel.getTransaction();
-    tx.begin();
-    channel.put(EventBuilder.withBody(new byte[]{'c', 'd'}));
-    set.add(new String(new byte[]{'c','d'}));
+    Set<String> in = putWithoutCommit(channel, tx, "doubleCheckpoint", 1);
     forceCheckpoint(channel);
     tx.commit();
     tx.close();
@@ -846,39 +704,26 @@ public class TestFileChannel {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    tx = channel.getTransaction();
-    tx.begin();
-    Event e = channel.take();
-    Assert.assertNotNull(e);
-    Assert.assertTrue(set.contains(new String(e.getBody())));
-    e = channel.take();
-    Assert.assertNotNull(e);
-    Assert.assertTrue(set.contains(new String(e.getBody())));
-    tx.commit();
-    tx.close();
+    Set<String> out = takeEvents(channel, 5);
+    compareInputAndOut(in, out);
     channel.stop();
   }
 
   @Test
   public void testReferenceCounts() throws Exception {
-    Set<String> set = Sets.newHashSet();
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
     overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20");
     final FileChannel channel = createFileChannel(overrides);
     channel.start();
-    List<String> in = putEvents(channel, "testing-reference-counting", 1, 15);
+    Set<String> in = putEvents(channel, "testing-reference-counting", 1, 15);
     Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (int i = 0; i < 10; i++) {
-      channel.take();
-    }
-
+    takeWithoutCommit(channel, tx, 10);
     forceCheckpoint(channel);
     tx.rollback();
     //Since we did not commit the original transaction. now we should get 15
     //events back.
-    final List<String> takenEvents = Lists.newArrayList();
+    final Set<String> takenEvents = Sets.newHashSet();
     Executors.newSingleThreadExecutor().submit(new Runnable() {
       @Override
       public void run() {
@@ -892,19 +737,7 @@ public class TestFileChannel {
     Assert.assertEquals(15, takenEvents.size());
   }
 
-  private static void forceCheckpoint(FileChannel channel) {
-    Log log = field("log")
-        .ofType(Log.class)
-          .in(channel)
-            .get();
 
-    Assert.assertTrue("writeCheckpoint returned false",
-        method("writeCheckpoint")
-          .withReturnType(Boolean.class)
-            .withParameterTypes(Boolean.class)
-              .in(log)
-                .invoke(true));
-  }
   private static void copyDecompressed(String resource, File output)
           throws IOException {
     URL input =  Resources.getResource(resource);
@@ -913,56 +746,4 @@ public class TestFileChannel {
     LOG.info("Copied " + copied + " bytes from " + input + " to " + output);
   }
 
-  private static List<String> takeEvents(Channel channel,
-          int batchSize) throws Exception {
-    return takeEvents(channel, batchSize, Integer.MAX_VALUE);
-  }
-  private static List<String> takeEvents(Channel channel,
-          int batchSize, int numEvents) throws Exception {
-    List<String> result = Lists.newArrayList();
-    for (int i = 0; i < numEvents; i += batchSize) {
-      for (int j = 0; j < batchSize; j++) {
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        try {
-          Event event = channel.take();
-          if(event == null) {
-            transaction.commit();
-            return result;
-          }
-          result.add(new String(event.getBody(), Charsets.UTF_8));
-          transaction.commit();
-        } catch (Exception ex) {
-          transaction.rollback();
-          throw ex;
-        } finally {
-          transaction.close();
-        }
-      }
-    }
-    return result;
-  }
-  private static List<String> putEvents(Channel channel, String prefix,
-          int batchSize, int numEvents) throws Exception {
-    List<String> result = Lists.newArrayList();
-    for (int i = 0; i < numEvents; i += batchSize) {
-      for (int j = 0; j < batchSize; j++) {
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        try {
-          String s = prefix + "-" + i +"-" + j + "-" + UUID.randomUUID();
-          Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
-          result.add(s);
-          channel.put(event);
-          transaction.commit();
-        } catch (Exception ex) {
-          transaction.rollback();
-          throw ex;
-        } finally {
-          transaction.close();
-        }
-      }
-    }
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cf44ac05/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 3474f2d..e64f856 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Charsets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -29,6 +30,15 @@ import java.util.Map;
 import org.apache.hadoop.io.Writable;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import static org.fest.reflect.core.Reflection.*;
 
 public class TestUtils {
 
@@ -37,9 +47,10 @@ public class TestUtils {
     String timestamp = String.valueOf(System.currentTimeMillis());
     headers.put("timestamp", timestamp);
     FlumeEvent event = new FlumeEvent(headers,
-        timestamp.getBytes());
+            timestamp.getBytes());
     return event;
   }
+
   public static DataInput toDataInput(Writable writable) throws IOException {
     ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
     DataOutputStream dataOutput = new DataOutputStream(byteOutput);
@@ -48,4 +59,107 @@ public class TestUtils {
     DataInputStream dataInput = new DataInputStream(byteInput);
     return dataInput;
   }
+
+  public static void compareInputAndOut(Set<String> in, Set<String> out) {
+    Assert.assertNotNull(in);
+    Assert.assertNotNull(out);
+    Assert.assertTrue(in.equals(out));
+  }
+
+  public static Set<String> putWithoutCommit(Channel channel, Transaction tx,
+          String prefix, int number) {
+    Set<String> events = Sets.newHashSet();
+    tx.begin();
+    for (int i = 0; i < number; i++) {
+      String eventData = (prefix + UUID.randomUUID()).toString();
+      Event event = EventBuilder.withBody(eventData.getBytes());
+      channel.put(event);
+      events.add(eventData);
+    }
+    return events;
+  }
+
+  public static Set<String> takeWithoutCommit(Channel channel, Transaction tx,
+          int number) {
+    Set<String> events = Sets.newHashSet();
+    tx.begin();
+    for (int i = 0; i < number; i++) {
+      Event e = channel.take();
+      if (e == null) {
+        break;
+      }
+      events.add(new String(e.getBody()));
+    }
+    return events;
+  }
+
+  public static void forceCheckpoint(FileChannel channel) {
+    Log log = field("log")
+            .ofType(Log.class)
+            .in(channel)
+            .get();
+
+    Assert.assertTrue("writeCheckpoint returned false",
+            method("writeCheckpoint")
+            .withReturnType(Boolean.class)
+            .withParameterTypes(Boolean.class)
+            .in(log)
+            .invoke(true));
+  }
+
+  public static Set<String> takeEvents(Channel channel,
+          int batchSize) throws Exception {
+    return takeEvents(channel, batchSize, Integer.MAX_VALUE);
+  }
+
+  public static Set<String> takeEvents(Channel channel,
+          int batchSize, int numEvents) throws Exception {
+    Set<String> result = Sets.newHashSet();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      Transaction transaction = channel.getTransaction();
+      try {
+        transaction.begin();
+        for (int j = 0; j < batchSize; j++) {
+          Event event = channel.take();
+          if (event == null) {
+            transaction.commit();
+            return result;
+          }
+          result.add(new String(event.getBody(), Charsets.UTF_8));
+        }
+        transaction.commit();
+      } catch (Exception ex) {
+        transaction.rollback();
+        throw ex;
+      } finally {
+        transaction.close();
+      }
+
+    }
+    return result;
+  }
+
+  public static Set<String> putEvents(Channel channel, String prefix,
+          int batchSize, int numEvents) throws Exception {
+    Set<String> result = Sets.newHashSet();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      try {
+        for (int j = 0; j < batchSize; j++) {
+          String s = prefix + "-" + i + "-" + j + "-" + UUID.randomUUID();
+          Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
+          result.add(s);
+          channel.put(event);
+        }
+        transaction.commit();
+      } catch (Exception ex) {
+        transaction.rollback();
+        throw ex;
+      } finally {
+        transaction.close();
+      }
+    }
+    return result;
+  }
 }

Reply via email to