Repository: kafka Updated Branches: refs/heads/trunk c82be0f30 -> 239dad1b9
KAFKA-5358; Consumer perf tool should count rebalance time (KIP-177) Author: huxihx <huxi...@hotmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3188 from huxihx/KAKFA-5358 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/239dad1b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/239dad1b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/239dad1b Branch: refs/heads/trunk Commit: 239dad1b9fb6803842067dd588f679ba6ae5efe7 Parents: c82be0f Author: huxihx <huxi...@hotmail.com> Authored: Wed Sep 20 09:28:31 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Sep 20 09:29:39 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsumerPerformance.scala | 143 +++++++++++++------ .../kafka/tools/ConsumerPerformanceTest.scala | 37 +++-- 2 files changed, 129 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/239dad1b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index ed1b440..bdec41f 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -54,9 +54,10 @@ object ConsumerPerformance { val totalBytesRead = new AtomicLong(0) val consumerTimeout = new AtomicBoolean(false) var metrics: mutable.Map[MetricName, _ <: Metric] = null + val joinGroupTimeInMs = new AtomicLong(0) if (!config.hideHeader) { - printHeader(!config.showDetailedStats) + printHeader(config.showDetailedStats, config.useOldConsumer) } var startMs, endMs = 0L @@ -64,7 +65,7 @@ object ConsumerPerformance { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) consumer.subscribe(Collections.singletonList(config.topic)) startMs = System.currentTimeMillis - consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead) + consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs) endMs = System.currentTimeMillis if (config.printMetrics) { @@ -86,19 +87,35 @@ object ConsumerPerformance { logger.info("starting threads") startMs = System.currentTimeMillis for (thread <- threadList) - thread.start + thread.start() for (thread <- threadList) - thread.join + thread.join() endMs = if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs else System.currentTimeMillis consumerConnector.shutdown() } val elapsedSecs = (endMs - startMs) / 1000.0 + val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get if (!config.showDetailedStats) { val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs)) + print("%s, %s, %.4f, %.4f, %d, %.4f".format( + config.dateFormat.format(startMs), + config.dateFormat.format(endMs), + totalMBRead, + totalMBRead / elapsedSecs, + totalMessagesRead.get, + totalMessagesRead.get / elapsedSecs + )) + if (!config.useOldConsumer) { + print(", %d, %d, %.4f, %.4f".format( + joinGroupTimeInMs.get, + fetchTimeInMs, + totalMBRead / (fetchTimeInMs / 1000.0), + totalMessagesRead.get / (fetchTimeInMs / 1000.0) + )) + } + println() } if (metrics != null) { @@ -107,11 +124,13 @@ object ConsumerPerformance { } - private[tools] def printHeader(showDetailedStats: Boolean): Unit = { - if (showDetailedStats) - println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + private[tools] def printHeader(showDetailedStats: Boolean, useOldConsumer: Boolean): Unit = { + val newFieldsInHeader = if (!useOldConsumer) ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec" else "" + if (!showDetailedStats) { + println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader) + } else { + println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader) + } } def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], @@ -120,29 +139,25 @@ object ConsumerPerformance { timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, - totalBytesRead: AtomicLong) { + totalBytesRead: AtomicLong, + joinTime: AtomicLong, + testStartTime: Long) { var bytesRead = 0L var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L + var joinStart = 0L + var joinTimeMsInSingleRound = 0L - // Wait for group join, metadata fetch, etc - val joinTimeout = 10000 - val isAssigned = new AtomicBoolean(false) consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) { - isAssigned.set(true) + joinTime.addAndGet(System.currentTimeMillis - joinStart) + joinTimeMsInSingleRound += System.currentTimeMillis - joinStart } def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) { - isAssigned.set(false) + joinStart = System.currentTimeMillis }}) - val joinStart = System.currentTimeMillis() - while (!isAssigned.get()) { - if (System.currentTimeMillis() - joinStart >= joinTimeout) { - throw new Exception("Timed out waiting for initial group join.") - } - consumer.poll(100) - } + consumer.poll(0) consumer.seekToBeginning(Collections.emptyList()) // Now start the benchmark @@ -165,7 +180,9 @@ object ConsumerPerformance { if (currentTimeMillis - lastReportTime >= config.reportingInterval) { if (config.showDetailedStats) - printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat) + printNewConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, + lastReportTime, currentTimeMillis, config.dateFormat, joinTimeMsInSingleRound) + joinTimeMsInSingleRound = 0L lastReportTime = currentTimeMillis lastMessagesRead = messagesRead lastBytesRead = bytesRead @@ -177,19 +194,64 @@ object ConsumerPerformance { totalBytesRead.set(bytesRead) } - def printProgressMessage(id: Int, - bytesRead: Long, - lastBytesRead: Long, - messagesRead: Long, - lastMessagesRead: Long, - startMs: Long, - endMs: Long, - dateFormat: SimpleDateFormat) = { + def printOldConsumerProgress(id: Int, + bytesRead: Long, + lastBytesRead: Long, + messagesRead: Long, + lastMessagesRead: Long, + startMs: Long, + endMs: Long, + dateFormat: SimpleDateFormat): Unit = { + printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat) + println() + } + + def printNewConsumerProgress(id: Int, + bytesRead: Long, + lastBytesRead: Long, + messagesRead: Long, + lastMessagesRead: Long, + startMs: Long, + endMs: Long, + dateFormat: SimpleDateFormat, + periodicJoinTimeInMs: Long): Unit = { + printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat) + printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs) + println() + } + + private def printBasicProgress(id: Int, + bytesRead: Long, + lastBytesRead: Long, + messagesRead: Long, + lastMessagesRead: Long, + startMs: Long, + endMs: Long, + dateFormat: SimpleDateFormat): Unit = { val elapsedMs: Double = endMs - startMs - val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) - val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead, - 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) + val totalMbRead = (bytesRead * 1.0) / (1024 * 1024) + val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) + val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs + val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0 + print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMbRead, + intervalMbPerSec, messagesRead, intervalMessagesPerSec)) + } + + private def printExtendedProgress(bytesRead: Long, + lastBytesRead: Long, + messagesRead: Long, + lastMessagesRead: Long, + startMs: Long, + endMs: Long, + periodicJoinTimeInMs: Long): Unit = { + val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs + val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) + val intervalMessagesRead = messagesRead - lastMessagesRead + val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0) + (0.0, 0.0) + else + (1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead / fetchTimeMs) + print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec)) } class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { @@ -316,14 +378,14 @@ object ConsumerPerformance { try { val iter = stream.iterator while (iter.hasNext && messagesRead < config.numMessages) { - val messageAndMetadata = iter.next + val messageAndMetadata = iter.next() messagesRead += 1 bytesRead += messageAndMetadata.message.length val currentTimeMillis = System.currentTimeMillis if (currentTimeMillis - lastReportTime >= config.reportingInterval) { if (config.showDetailedStats) - printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat) + printOldConsumerProgress(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat) lastReportTime = currentTimeMillis lastMessagesRead = messagesRead lastBytesRead = bytesRead @@ -339,7 +401,8 @@ object ConsumerPerformance { totalMessagesRead.addAndGet(messagesRead) totalBytesRead.addAndGet(bytesRead) if (config.showDetailedStats) - printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat) + printOldConsumerProgress(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/239dad1b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala index 2fa774c..bafe8ed 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala @@ -26,21 +26,36 @@ import org.junit.Test class ConsumerPerformanceTest { private val outContent = new ByteArrayOutputStream() + private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS") @Test - def testHeaderMatchBody(): Unit = { + def testDetailedHeaderMatchBody(): Unit = { + testHeaderMatchContent(detailed = true, useOldConsumer = false, 2, + () => ConsumerPerformance.printNewConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L)) + testHeaderMatchContent(detailed = true, useOldConsumer = true, 4, + () => ConsumerPerformance.printOldConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, + dateFormat)) + } + + @Test + def testNonDetailedHeaderMatchBody(): Unit = { + testHeaderMatchContent(detailed = false, useOldConsumer = false, 2, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " + + s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1, 1, 1.1, 1.1")) + testHeaderMatchContent(detailed = false, useOldConsumer = true, 4, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " + + s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0")) + } + + private def testHeaderMatchContent(detailed: Boolean, useOldConsumer: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = { Console.withOut(outContent) { - ConsumerPerformance.printHeader(true) - ConsumerPerformance.printProgressMessage(1, 1024 * 1024, 0, 1, 0, 0, 1, - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS") - ) - } + ConsumerPerformance.printHeader(detailed, useOldConsumer) + fun() - val contents = outContent.toString.split("\n") - assertEquals(2, contents.length) - val header = contents(0) - val body = contents(1) + val contents = outContent.toString.split("\n") + assertEquals(expectedOutputLineCount, contents.length) + val header = contents(0) + val body = contents(1) - assertEquals(header.split(",").length, body.split(",").length) + assertEquals(header.split(",").length, body.split(",").length) + } } }