Updated Branches: refs/heads/flume-1.3.0 19d815e49 -> a51e40421
FLUME-1570: StressSource batching does not work unless maxTotalEvents is specified (Ted Malaska via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a51e4042 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a51e4042 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a51e4042 Branch: refs/heads/flume-1.3.0 Commit: a51e40421077c8f13f89376789dad5713cfbfca0 Parents: 19d815e Author: Brock Noland <[email protected]> Authored: Tue Sep 18 13:26:40 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Tue Sep 18 13:27:01 2012 -0500 ---------------------------------------------------------------------- .../java/org/apache/flume/source/StressSource.java | 15 +++-- .../org/apache/flume/source/TestStressSource.java | 46 +++++++++++++-- 2 files changed, 50 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a51e4042/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index b4a00f5..562b983 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -21,6 +21,7 @@ package org.apache.flume.source; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -51,7 +52,8 @@ public class StressSource extends AbstractSource implements private int batchSize; private long lastSent = 0; private Event event; - private ArrayList<Event> eventBatchList; + private List<Event> eventBatchList; + private List<Event> eventBatchListToProcess; public StressSource() { counterGroup = new CounterGroup(); @@ -115,12 +117,13 @@ public class StressSource extends AbstractSource implements } else { long eventsLeft = maxTotalEvents - totalEventSent; - if (eventsLeft < batchSize) { - eventBatchList.subList(0, (int)eventsLeft - 1); - lastSent = eventsLeft; + if (maxTotalEvents >= 0 && eventsLeft < batchSize) { + eventBatchListToProcess = eventBatchList.subList(0, (int)eventsLeft); + } else { + eventBatchListToProcess = eventBatchList; } - - getChannelProcessor().processEventBatch(eventBatchList); + lastSent = eventBatchListToProcess.size(); + getChannelProcessor().processEventBatch(eventBatchListToProcess); } counterGroup.addAndGet("events.successful", lastSent); http://git-wip-us.apache.org/repos/asf/flume/blob/a51e4042/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java index e98a46f..28270f4 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.List; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.flume.ChannelException; @@ -57,8 +58,8 @@ public class TestStressSource { } @SuppressWarnings("unchecked") - private List<Event> getEventList(StressSource source) { - return field("eventBatchList").ofType(List.class).in(source).get(); + private List<Event> getLastProcessedEventList(StressSource source) { + return field("eventBatchListToProcess").ofType(List.class).in(source).get(); } private CounterGroup getCounterGroup(StressSource source) { @@ -96,10 +97,45 @@ public class TestStressSource { TestCase.assertTrue("Source should have sent all events in 4 batches", i == 4); break; } + if (i < 3) { + verify(mockProcessor, + times(i+1)).processEventBatch(getLastProcessedEventList(source)); + } else { + verify(mockProcessor, + times(1)).processEventBatch(getLastProcessedEventList(source)); + } } - verify(mockProcessor, times(4)).processEventBatch(getEventList(source)); - TestCase.assertTrue("Number of successful events should be 35", getCounterGroup(source).get("events.successful") == 35); - TestCase.assertTrue("Number of failure events should be 0", getCounterGroup(source).get("events.failed") == 0); + long successfulEvents = getCounterGroup(source).get("events.successful"); + TestCase.assertTrue("Number of successful events should be 35 but was " + + successfulEvents, successfulEvents == 35); + long failedEvents = getCounterGroup(source).get("events.failed"); + TestCase.assertTrue("Number of failure events should be 0 but was " + + failedEvents, failedEvents == 0); + } + + @Test + public void testBatchEventsWithoutMatTotalEvents() throws InterruptedException, + EventDeliveryException { + StressSource source = new StressSource(); + source.setChannelProcessor(mockProcessor); + Context context = new Context(); + context.put("batchSize", "10"); + source.configure(context); + + for (int i = 0; i < 10; i++) { + Assert.assertFalse("StressSource with no maxTotalEvents should not return " + + Status.BACKOFF, source.process() == Status.BACKOFF); + } + verify(mockProcessor, + times(10)).processEventBatch(getLastProcessedEventList(source)); + + long successfulEvents = getCounterGroup(source).get("events.successful"); + TestCase.assertTrue("Number of successful events should be 100 but was " + + successfulEvents, successfulEvents == 100); + + long failedEvents = getCounterGroup(source).get("events.failed"); + TestCase.assertTrue("Number of failure events should be 0 but was " + + failedEvents, failedEvents == 0); } @Test
