Updated Branches: refs/heads/flume-1.3.0 8b9e51324 -> d4a66bebd
FLUME-1557: It would be nice if SequenceGeneratorSource could do batching (Ted Malaska via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d4a66beb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d4a66beb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d4a66beb Branch: refs/heads/flume-1.3.0 Commit: d4a66bebd34c86e5a338071c0da77a601269ddb6 Parents: 8b9e513 Author: Brock Noland <[email protected]> Authored: Wed Sep 12 14:14:29 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Wed Sep 12 14:14:53 2012 -0500 ---------------------------------------------------------------------- .../flume/source/SequenceGeneratorSource.java | 33 +++++++++++- .../flume/source/TestSequenceGeneratorSource.java | 40 +++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d4a66beb/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 440c5a9..1fbcf42 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -19,34 +19,61 @@ package org.apache.flume.source; +import java.util.ArrayList; +import java.util.List; import org.apache.flume.ChannelException; +import org.apache.flume.Context; import org.apache.flume.CounterGroup; +import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; +import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SequenceGeneratorSource extends AbstractSource implements - PollableSource { + PollableSource, Configurable { private static final Logger logger = LoggerFactory .getLogger(SequenceGeneratorSource.class); private long sequence; + private int batchSize; private CounterGroup counterGroup; + private List<Event> batchArrayList; public SequenceGeneratorSource() { sequence = 0; counterGroup = new CounterGroup(); } + /** + * Read parameters from context + * <li>batchSize = type int that defines the size of event batches + */ + @Override + public void configure(Context context) { + batchSize = context.getInteger("batchSize", 1); + if (batchSize > 1) { + batchArrayList = new ArrayList<Event>(batchSize); + } + } + @Override public Status process() throws EventDeliveryException { try { - getChannelProcessor().processEvent( - EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + if (batchSize <= 1) { + getChannelProcessor().processEvent( + EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + } else { + batchArrayList.clear(); + for (int i = 0; i < batchSize; i++) { + batchArrayList.add(i, EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + } + getChannelProcessor().processEventBatch(batchArrayList); + } counterGroup.incrementAndGet("events.successful"); } catch (ChannelException ex) { counterGroup.incrementAndGet("events.failed"); http://git-wip-us.apache.org/repos/asf/flume/blob/d4a66beb/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java index 89dbeb2..c9d3e20 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java @@ -75,6 +75,46 @@ public class TestSequenceGeneratorSource { } @Test + public void testBatchProcessWithLifeCycle() throws InterruptedException, LifecycleException, + EventDeliveryException { + + int batchSize = 10; + + Channel channel = new PseudoTxnMemoryChannel(); + Context context = new Context(); + + context.put("logicalNode.name", "test"); + context.put("batchSize", Integer.toString(batchSize)); + + Configurables.configure(source, context); + Configurables.configure(channel, context); + + List<Channel> channels = new ArrayList<Channel>(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + + source.start(); + + for (long i = 0; i < 100; i++) { + source.process(); + + for (long j = batchSize; j > 0; j--) { + Event event = channel.take(); + String expectedVal = String.valueOf(((i+1)*batchSize)-j); + String resultedVal = new String(event.getBody()); + Assert.assertTrue("Expected " + expectedVal + " is not equals to " + + resultedVal, expectedVal.equals(resultedVal)); + } + } + + source.stop(); + } + + @Test public void testLifecycle() throws InterruptedException, EventDeliveryException {
