Repository: flume Updated Branches: refs/heads/trunk e1e5acea2 -> 9f75c40a6
FLUME-2492. Flume's Kafka Source doesn't account time correctly (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9f75c40a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9f75c40a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9f75c40a Branch: refs/heads/trunk Commit: 9f75c40a69fa4404ea2c344fa29285a00eb082fa Parents: e1e5ace Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 3 11:24:24 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 3 11:24:24 2014 -0700 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 46 ++++++-------------- .../flume/source/kafka/KafkaSourceTest.java | 24 ++++++++++ 2 files changed, 37 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9f75c40a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 231ae42..8cdc967 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -75,13 +75,16 @@ public class KafkaSource extends AbstractSource byte[] bytes; Event event; Map<String, String> headers; + long batchStartTime = System.currentTimeMillis(); + long batchEndTime = System.currentTimeMillis() + timeUpperLimit; try { int eventCounter = 0; int timeWaited = 0; - IterStatus iterStatus = new IterStatus(false, -1); - while (eventCounter < batchUpperLimit && timeWaited < timeUpperLimit) { - iterStatus = timedHasNext(); - if (iterStatus.hasData()) { + boolean iterStatus = false; + while (eventCounter < batchUpperLimit && + System.currentTimeMillis() < batchEndTime) { + iterStatus = hasNext(); + if (iterStatus) { // get next message bytes = it.next().message(); @@ -96,9 +99,8 @@ public class KafkaSource extends AbstractSource eventList.add(event); eventCounter++; } - timeWaited += iterStatus.getWaitTime(); if (log.isDebugEnabled()) { - log.debug("Waited: {} ", timeWaited); + log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime); log.debug("Event #: {}", eventCounter); } } @@ -111,7 +113,7 @@ public class KafkaSource extends AbstractSource consumer.commitOffsets(); } } - if (!iterStatus.hasData()) { + if (!iterStatus) { if (log.isDebugEnabled()) { log.debug("Returning with backoff. No more data to read"); } @@ -209,37 +211,15 @@ public class KafkaSource extends AbstractSource /** * Check if there are messages waiting in Kafka, * waiting until timeout (10ms by default) for messages to arrive. - * And timing our wait. - * @return IterStatus object. - * Indicating whether a message was found and how long we waited for it + * and catching the timeout exception to return a boolean */ - IterStatus timedHasNext() { + boolean hasNext() { try { - long startTime = System.currentTimeMillis(); it.hasNext(); - long endTime = System.currentTimeMillis(); - return new IterStatus(true, endTime - startTime); + return true; } catch (ConsumerTimeoutException e) { - return new IterStatus(false, consumerTimeout); + return false; } } - private class IterStatus { - private long waitTime; - private boolean hasData; - - - private IterStatus(boolean hasData,long waitTime) { - this.waitTime = waitTime; - this.hasData = hasData; - } - - public long getWaitTime() { - return waitTime; - } - - public boolean hasData() { - return hasData; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9f75c40a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java index d067e24..7684616 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java @@ -18,6 +18,7 @@ package org.apache.flume.source.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Mockito.*; @@ -194,6 +195,29 @@ public class KafkaSourceTest { assertEquals(Status.BACKOFF, status); } + @Test + public void testBatchTime() throws InterruptedException, + EventDeliveryException { + context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + for (int i=1; i<5000; i++) { + kafkaServer.produce(topicName, "", "hello, world " + i); + } + Thread.sleep(500L); + + long startTime = System.currentTimeMillis(); + Status status = kafkaSource.process(); + long endTime = System.currentTimeMillis(); + assertEquals(Status.READY, status); + assertTrue(endTime - startTime < + ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) + + context.getLong("kafka.consumer.timeout.ms")) ); + } + } \ No newline at end of file
