Updated Branches:
  refs/heads/trunk c2bcda202 -> 3080ce09a

FLUME-1557: It would be nice if SequenceGeneratorSource could do batching

(Ted Malaska via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3080ce09
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3080ce09
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3080ce09

Branch: refs/heads/trunk
Commit: 3080ce09ad2b730bee27bddd5a490de3855220e4
Parents: c2bcda2
Author: Brock Noland <[email protected]>
Authored: Wed Sep 12 14:14:29 2012 -0500
Committer: Brock Noland <[email protected]>
Committed: Wed Sep 12 14:14:29 2012 -0500

----------------------------------------------------------------------
 .../flume/source/SequenceGeneratorSource.java      |   33 +++++++++++-
 .../flume/source/TestSequenceGeneratorSource.java  |   40 +++++++++++++++
 2 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3080ce09/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 440c5a9..1fbcf42 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -19,34 +19,61 @@
 
 package org.apache.flume.source;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
+import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SequenceGeneratorSource extends AbstractSource implements
-    PollableSource {
+    PollableSource, Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(SequenceGeneratorSource.class);
 
   private long sequence;
+  private int batchSize;
   private CounterGroup counterGroup;
+  private List<Event> batchArrayList;
 
   public SequenceGeneratorSource() {
     sequence = 0;
     counterGroup = new CounterGroup();
   }
 
+  /**
+   * Read parameters from context
+   * <li>batchSize = type int that defines the size of event batches
+   */
+  @Override
+  public void configure(Context context) {
+    batchSize = context.getInteger("batchSize", 1);
+    if (batchSize > 1) {
+      batchArrayList = new ArrayList<Event>(batchSize);
+    }
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
 
     try {
-      getChannelProcessor().processEvent(
-          EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+      if (batchSize <= 1) {
+        getChannelProcessor().processEvent(
+            EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+      } else {
+        batchArrayList.clear();
+        for (int i = 0; i < batchSize; i++) {
+          batchArrayList.add(i, 
EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+        }
+        getChannelProcessor().processEventBatch(batchArrayList);
+      }
       counterGroup.incrementAndGet("events.successful");
     } catch (ChannelException ex) {
       counterGroup.incrementAndGet("events.failed");

http://git-wip-us.apache.org/repos/asf/flume/blob/3080ce09/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
index 89dbeb2..c9d3e20 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
@@ -75,6 +75,46 @@ public class TestSequenceGeneratorSource {
   }
 
   @Test
+  public void testBatchProcessWithLifeCycle() throws InterruptedException, 
LifecycleException,
+      EventDeliveryException {
+
+    int batchSize = 10;
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Context context = new Context();
+
+    context.put("logicalNode.name", "test");
+    context.put("batchSize", Integer.toString(batchSize));
+
+    Configurables.configure(source, context);
+    Configurables.configure(channel, context);
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    source.start();
+
+    for (long i = 0; i < 100; i++) {
+      source.process();
+
+      for (long j = batchSize; j > 0; j--) {
+        Event event = channel.take();
+        String expectedVal = String.valueOf(((i+1)*batchSize)-j);
+        String resultedVal = new String(event.getBody());
+        Assert.assertTrue("Expected " + expectedVal + " is not equals to " +
+            resultedVal, expectedVal.equals(resultedVal));
+      }
+    }
+
+    source.stop();
+  }
+
+  @Test
   public void testLifecycle() throws InterruptedException,
       EventDeliveryException {
 

Reply via email to