Repository: flume
Updated Branches:
  refs/heads/flume-1.6 29ca0bae3 -> 6f3cb541c


FLUME-2495. Kafka Source may miss events when channel is not available

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6f3cb541
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6f3cb541
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6f3cb541

Branch: refs/heads/flume-1.6
Commit: 6f3cb541c80b0fb3fe3faf445b52745a588d4993
Parents: 29ca0ba
Author: Hari Shreedharan <[email protected]>
Authored: Tue Oct 7 14:12:02 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Oct 7 14:12:48 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/source/kafka/KafkaSource.java  |  21 ++-
 .../flume/source/kafka/TestKafkaSource.java     | 147 +++++++++++++++++--
 2 files changed, 143 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6f3cb541/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 8cdc967..9d77b47 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -71,17 +71,15 @@ public class KafkaSource extends AbstractSource
   private final List<Event> eventList = new ArrayList<Event>();
 
   public Status process() throws EventDeliveryException {
-    eventList.clear();
+
     byte[] bytes;
     Event event;
     Map<String, String> headers;
     long batchStartTime = System.currentTimeMillis();
     long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
     try {
-      int eventCounter = 0;
-      int timeWaited = 0;
       boolean iterStatus = false;
-      while (eventCounter < batchUpperLimit &&
+      while (eventList.size() < batchUpperLimit &&
               System.currentTimeMillis() < batchEndTime) {
         iterStatus = hasNext();
         if (iterStatus) {
@@ -97,17 +95,21 @@ public class KafkaSource extends AbstractSource
           }
           event = EventBuilder.withBody(bytes, headers);
           eventList.add(event);
-          eventCounter++;
         }
         if (log.isDebugEnabled()) {
           log.debug("Waited: {} ", System.currentTimeMillis() - 
batchStartTime);
-          log.debug("Event #: {}", eventCounter);
+          log.debug("Event #: {}", eventList.size());
         }
       }
       // If we have events, send events to channel
+      // clear the event list
       // and commit if Kafka doesn't auto-commit
-      if (eventCounter > 0) {
+      if (eventList.size() > 0) {
         getChannelProcessor().processEventBatch(eventList);
+        eventList.clear();
+        if (log.isDebugEnabled()) {
+          log.debug("Wrote {} events to channel", eventList.size());
+        }
         if (!kafkaAutoCommitEnabled) {
           // commit the read transactions to Kafka to avoid duplicates
           consumer.commitOffsets();
@@ -203,11 +205,6 @@ public class KafkaSource extends AbstractSource
     super.stop();
   }
 
-
-
-
-
-
   /**
    * Check if there are messages waiting in Kafka,
    * waiting until timeout (10ms by default) for messages to arrive.

http://git-wip-us.apache.org/repos/asf/flume/blob/6f3cb541/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 3695860..72eec77 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -75,19 +75,7 @@ public class TestKafkaSource {
     context.put(KafkaSourceConstants.TOPIC,topicName);
     context.put("kafka.consumer.timeout.ms","100");
 
-
-    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
-
-    events = Lists.newArrayList();
-
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        events.addAll((List<Event>)invocation.getArguments()[0]);
-        return null;
-      }
-    }).when(channelProcessor).processEventBatch(any(List.class));
-    kafkaSource.setChannelProcessor(channelProcessor);
+    kafkaSource.setChannelProcessor(createGoodChannel());
   }
 
   @After
@@ -210,4 +198,137 @@ public class TestKafkaSource {
             ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
             context.getLong("kafka.consumer.timeout.ms")) );
   }
+
+  // Consume event, stop source, start again and make sure we are not
+  // consuming same event again
+  @Test
+  public void testCommit() throws InterruptedException, EventDeliveryException 
{
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    kafkaSource.stop();
+    Thread.sleep(500L);
+    kafkaSource.start();
+    Thread.sleep(500L);
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+
+  }
+
+  // Remove channel processor and test if we can consume events again
+  @Test
+  public void testNonCommit() throws EventDeliveryException,
+          InterruptedException {
+
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+    Thread.sleep(500L);
+
+    kafkaSource.setChannelProcessor(createBadChannel());
+    log.debug("processing from kafka to bad channel");
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+
+    log.debug("repairing channel");
+    kafkaSource.setChannelProcessor(createGoodChannel());
+
+    log.debug("re-process to good channel - this should work");
+    kafkaSource.process();
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+
+  }
+
+  @Test
+  public void testTwoBatches() throws InterruptedException,
+          EventDeliveryException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "event 1");
+    Thread.sleep(500L);
+
+    kafkaSource.process();
+    Assert.assertEquals("event 1", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+    events.clear();
+
+    kafkaServer.produce(topicName, "", "event 2");
+    Thread.sleep(500L);
+    kafkaSource.process();
+    Assert.assertEquals("event 2", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+  }
+
+  @Test
+  public void testTwoBatchesWithAutocommit() throws InterruptedException,
+          EventDeliveryException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    context.put("kafka.auto.commit.enable","true");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "event 1");
+    Thread.sleep(500L);
+
+    kafkaSource.process();
+    Assert.assertEquals("event 1", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+    events.clear();
+
+    kafkaServer.produce(topicName, "", "event 2");
+    Thread.sleep(500L);
+    kafkaSource.process();
+    Assert.assertEquals("event 2", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+  }
+
+  ChannelProcessor createGoodChannel() {
+
+    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
+
+    events = Lists.newArrayList();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        events.addAll((List<Event>)invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(channelProcessor).processEventBatch(any(List.class));
+
+    return channelProcessor;
+
+  }
+
+  ChannelProcessor createBadChannel() {
+    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        throw new ChannelException("channel intentional broken");
+      }
+    }).when(channelProcessor).processEventBatch(any(List.class));
+
+    return channelProcessor;
+  }
 }
\ No newline at end of file

Reply via email to