Updated Branches:
  refs/heads/flume-1.3.0 19d815e49 -> a51e40421

FLUME-1570: StressSource batching does not work unless maxTotalEvents is 
specified

(Ted Malaska via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a51e4042
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a51e4042
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a51e4042

Branch: refs/heads/flume-1.3.0
Commit: a51e40421077c8f13f89376789dad5713cfbfca0
Parents: 19d815e
Author: Brock Noland <[email protected]>
Authored: Tue Sep 18 13:26:40 2012 -0500
Committer: Brock Noland <[email protected]>
Committed: Tue Sep 18 13:27:01 2012 -0500

----------------------------------------------------------------------
 .../java/org/apache/flume/source/StressSource.java |   15 +++--
 .../org/apache/flume/source/TestStressSource.java  |   46 +++++++++++++--
 2 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a51e4042/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index b4a00f5..562b983 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -51,7 +52,8 @@ public class StressSource extends AbstractSource implements
   private int batchSize;
   private long lastSent = 0;
   private Event event;
-  private ArrayList<Event> eventBatchList;
+  private List<Event> eventBatchList;
+  private List<Event> eventBatchListToProcess;
 
   public StressSource() {
     counterGroup = new CounterGroup();
@@ -115,12 +117,13 @@ public class StressSource extends AbstractSource 
implements
       } else {
         long eventsLeft = maxTotalEvents - totalEventSent;
 
-        if (eventsLeft < batchSize) {
-          eventBatchList.subList(0, (int)eventsLeft - 1);
-          lastSent = eventsLeft;
+        if (maxTotalEvents >= 0 && eventsLeft < batchSize) {
+          eventBatchListToProcess = eventBatchList.subList(0, (int)eventsLeft);
+        } else {
+          eventBatchListToProcess = eventBatchList;
         }
-
-        getChannelProcessor().processEventBatch(eventBatchList);
+        lastSent = eventBatchListToProcess.size();
+        getChannelProcessor().processEventBatch(eventBatchListToProcess);
       }
 
       counterGroup.addAndGet("events.successful", lastSent);

http://git-wip-us.apache.org/repos/asf/flume/blob/a51e4042/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
index e98a46f..28270f4 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
 import java.util.ArrayList;
 import java.util.List;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.flume.ChannelException;
@@ -57,8 +58,8 @@ public class TestStressSource {
   }
 
   @SuppressWarnings("unchecked")
-  private List<Event> getEventList(StressSource source) {
-    return field("eventBatchList").ofType(List.class).in(source).get();
+  private List<Event> getLastProcessedEventList(StressSource source) {
+    return 
field("eventBatchListToProcess").ofType(List.class).in(source).get();
   }
 
   private CounterGroup getCounterGroup(StressSource source) {
@@ -96,10 +97,45 @@ public class TestStressSource {
         TestCase.assertTrue("Source should have sent all events in 4 batches", 
i == 4);
         break;
       }
+      if (i < 3) {
+        verify(mockProcessor,
+            times(i+1)).processEventBatch(getLastProcessedEventList(source));
+      } else {
+        verify(mockProcessor,
+            times(1)).processEventBatch(getLastProcessedEventList(source));
+      }
     }
-    verify(mockProcessor, times(4)).processEventBatch(getEventList(source));
-    TestCase.assertTrue("Number of successful events should be 35", 
getCounterGroup(source).get("events.successful") == 35);
-    TestCase.assertTrue("Number of failure events should be 0", 
getCounterGroup(source).get("events.failed") == 0);
+    long successfulEvents = getCounterGroup(source).get("events.successful");
+    TestCase.assertTrue("Number of successful events should be 35 but was " +
+        successfulEvents, successfulEvents == 35);
+    long failedEvents = getCounterGroup(source).get("events.failed");
+    TestCase.assertTrue("Number of failure events should be 0 but was " +
+        failedEvents, failedEvents == 0);
+  }
+
+  @Test
+  public void testBatchEventsWithoutMatTotalEvents() throws 
InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("batchSize", "10");
+    source.configure(context);
+
+    for (int i = 0; i < 10; i++) {
+      Assert.assertFalse("StressSource with no maxTotalEvents should not 
return " +
+          Status.BACKOFF, source.process() == Status.BACKOFF);
+    }
+    verify(mockProcessor,
+        times(10)).processEventBatch(getLastProcessedEventList(source));
+
+    long successfulEvents = getCounterGroup(source).get("events.successful");
+    TestCase.assertTrue("Number of successful events should be 100 but was " +
+        successfulEvents, successfulEvents == 100);
+
+    long failedEvents = getCounterGroup(source).get("events.failed");
+    TestCase.assertTrue("Number of failure events should be 0 but was " +
+        failedEvents, failedEvents == 0);
   }
 
   @Test

Reply via email to