Repository: flume
Updated Branches:
  refs/heads/trunk e1e5acea2 -> 9f75c40a6


FLUME-2492. Flume's Kafka Source doesn't account time correctly

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9f75c40a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9f75c40a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9f75c40a

Branch: refs/heads/trunk
Commit: 9f75c40a69fa4404ea2c344fa29285a00eb082fa
Parents: e1e5ace
Author: Hari Shreedharan <[email protected]>
Authored: Fri Oct 3 11:24:24 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Oct 3 11:24:24 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/source/kafka/KafkaSource.java  | 46 ++++++--------------
 .../flume/source/kafka/KafkaSourceTest.java     | 24 ++++++++++
 2 files changed, 37 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9f75c40a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 231ae42..8cdc967 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -75,13 +75,16 @@ public class KafkaSource extends AbstractSource
     byte[] bytes;
     Event event;
     Map<String, String> headers;
+    long batchStartTime = System.currentTimeMillis();
+    long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
     try {
       int eventCounter = 0;
       int timeWaited = 0;
-      IterStatus iterStatus = new IterStatus(false, -1);
-      while (eventCounter < batchUpperLimit && timeWaited < timeUpperLimit) {
-        iterStatus = timedHasNext();
-        if (iterStatus.hasData()) {
+      boolean iterStatus = false;
+      while (eventCounter < batchUpperLimit &&
+              System.currentTimeMillis() < batchEndTime) {
+        iterStatus = hasNext();
+        if (iterStatus) {
           // get next message
           bytes = it.next().message();
 
@@ -96,9 +99,8 @@ public class KafkaSource extends AbstractSource
           eventList.add(event);
           eventCounter++;
         }
-        timeWaited += iterStatus.getWaitTime();
         if (log.isDebugEnabled()) {
-          log.debug("Waited: {} ", timeWaited);
+          log.debug("Waited: {} ", System.currentTimeMillis() - 
batchStartTime);
           log.debug("Event #: {}", eventCounter);
         }
       }
@@ -111,7 +113,7 @@ public class KafkaSource extends AbstractSource
           consumer.commitOffsets();
         }
       }
-      if (!iterStatus.hasData()) {
+      if (!iterStatus) {
         if (log.isDebugEnabled()) {
           log.debug("Returning with backoff. No more data to read");
         }
@@ -209,37 +211,15 @@ public class KafkaSource extends AbstractSource
   /**
    * Check if there are messages waiting in Kafka,
    * waiting until timeout (10ms by default) for messages to arrive.
-   * And timing our wait.
-   * @return IterStatus object.
-   * Indicating whether a message was found and how long we waited for it
+   * and catching the timeout exception to return a boolean
    */
-  IterStatus timedHasNext() {
+  boolean hasNext() {
     try {
-      long startTime = System.currentTimeMillis();
       it.hasNext();
-      long endTime = System.currentTimeMillis();
-      return new IterStatus(true, endTime - startTime);
+      return true;
     } catch (ConsumerTimeoutException e) {
-      return new IterStatus(false, consumerTimeout);
+      return false;
     }
   }
 
-  private class IterStatus {
-    private long waitTime;
-    private boolean hasData;
-
-
-    private IterStatus(boolean hasData,long waitTime) {
-      this.waitTime = waitTime;
-      this.hasData = hasData;
-    }
-
-    public long getWaitTime() {
-      return waitTime;
-    }
-
-    public boolean hasData() {
-      return hasData;
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9f75c40a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
index d067e24..7684616 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
@@ -18,6 +18,7 @@
 package org.apache.flume.source.kafka;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Mockito.*;
@@ -194,6 +195,29 @@ public class KafkaSourceTest {
     assertEquals(Status.BACKOFF, status);
   }
 
+  @Test
+  public void testBatchTime() throws InterruptedException,
+          EventDeliveryException {
+    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    for (int i=1; i<5000; i++) {
+      kafkaServer.produce(topicName, "", "hello, world " + i);
+    }
+    Thread.sleep(500L);
+
+    long startTime = System.currentTimeMillis();
+    Status status = kafkaSource.process();
+    long endTime = System.currentTimeMillis();
+    assertEquals(Status.READY, status);
+    assertTrue(endTime - startTime <
+            ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
+            context.getLong("kafka.consumer.timeout.ms")) );
+  }
+
 
 
 }
\ No newline at end of file

Reply via email to