Updated Branches: refs/heads/trunk 11fada202 -> 960d7c4b0
FLUME-1855. Sequence gen source should be able to stop after a fixed number of events. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/960d7c4b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/960d7c4b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/960d7c4b Branch: refs/heads/trunk Commit: 960d7c4b053669d9eb7c24f032d55a2ff659820b Parents: 11fada2 Author: Mike Percy <[email protected]> Authored: Thu Jan 17 00:50:15 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Thu Jan 17 00:50:15 2013 -0800 ---------------------------------------------------------------------- .../flume/source/SequenceGeneratorSource.java | 29 ++++++++++++--- 1 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/960d7c4b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 1fbcf42..3cb1ccf 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -42,6 +42,8 @@ public class SequenceGeneratorSource extends AbstractSource implements private int batchSize; private CounterGroup counterGroup; private List<Event> batchArrayList; + private long totalEvents; + private long eventsSent = 0; public SequenceGeneratorSource() { sequence = 0; @@ -58,28 +60,45 @@ public class SequenceGeneratorSource extends AbstractSource implements if (batchSize > 1) { batchArrayList = new ArrayList<Event>(batchSize); } + totalEvents = context.getLong("totalEvents", Long.MAX_VALUE); } @Override public Status process() throws EventDeliveryException { + Status status = Status.READY; + int i = 0; try { if (batchSize <= 1) { - getChannelProcessor().processEvent( + if(eventsSent < totalEvents) { + getChannelProcessor().processEvent( EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + eventsSent++; + } else { + status = Status.BACKOFF; + } } else { batchArrayList.clear(); - for (int i = 0; i < batchSize; i++) { - batchArrayList.add(i, EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + for (i = 0; i < batchSize; i++) { + if(eventsSent < totalEvents){ + batchArrayList.add(i, EventBuilder.withBody(String + .valueOf(sequence++).getBytes())); + eventsSent++; + } else { + status = Status.BACKOFF; + } + } + if(!batchArrayList.isEmpty()) { + getChannelProcessor().processEventBatch(batchArrayList); } - getChannelProcessor().processEventBatch(batchArrayList); } counterGroup.incrementAndGet("events.successful"); } catch (ChannelException ex) { counterGroup.incrementAndGet("events.failed"); + eventsSent -= i; } - return Status.READY; + return status; } @Override
