Repository: kafka Updated Branches: refs/heads/trunk 1ed88f0eb -> 1fdb758f2
KAFKA-2202: fix consumerTimeoutMs computation on ConsumerPerformance; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fdb758f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fdb758f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fdb758f Branch: refs/heads/trunk Commit: 1fdb758f286868a00cbebcc5bbcfc8195529158e Parents: 1ed88f0 Author: Manikumar Reddy <[email protected]> Authored: Mon Aug 10 20:58:20 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Aug 10 20:58:20 2015 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/ConsumerPerformance.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1fdb758f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 903318d..7797dee 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -30,6 +30,7 @@ import kafka.consumer.ConsumerConnector import kafka.consumer.KafkaStream import kafka.consumer.ConsumerTimeoutException import java.text.SimpleDateFormat +import java.util.concurrent.atomic.AtomicBoolean /** * Performance test for the full zookeeper consumer @@ -43,6 +44,7 @@ object ConsumerPerformance { logger.info("Starting consumer...") val totalMessagesRead = new AtomicLong(0) val totalBytesRead = new AtomicLong(0) + val consumerTimeout = new AtomicBoolean(false) if (!config.hideHeader) { if (!config.showDetailedStats) @@ -67,7 +69,7 @@ object ConsumerPerformance { var threadList = List[ConsumerPerfThread]() for ((topic, streamList) <- topicMessageStreams) for (i <- 0 until streamList.length) - threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead) + threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout) logger.info("Sleeping for 1 second.") Thread.sleep(1000) @@ -77,7 +79,10 @@ object ConsumerPerformance { thread.start for (thread <- threadList) thread.join - endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs + if(consumerTimeout.get()) + endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs + else + endMs = System.currentTimeMillis consumerConnector.shutdown() } val elapsedSecs = (endMs - startMs) / 1000.0 @@ -209,7 +214,7 @@ object ConsumerPerformance { } class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], - config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong, consumerTimeout: AtomicBoolean) extends Thread(name) { override def run() { @@ -238,7 +243,9 @@ object ConsumerPerformance { } catch { case _: InterruptedException => case _: ClosedByInterruptException => - case _: ConsumerTimeoutException => + case _: ConsumerTimeoutException => { + consumerTimeout.set(true); + } case e: Throwable => e.printStackTrace() } totalMessagesRead.addAndGet(messagesRead)
