Repository: kafka Updated Branches: refs/heads/trunk 7c436d388 -> 609e9b0b2
KAFKA-5068; Optionally print out metrics after running the perf tests junrao added a config `--print.metrics` to control whether ProducerPerformance prints out metrics at the end of the test. If its okay, will add the code counterpart for consumer. Author: huxi <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #2860 from amethystic/kafka-5068_print_metrics_in_perf_tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/609e9b0b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/609e9b0b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/609e9b0b Branch: refs/heads/trunk Commit: 609e9b0b2f46ce72ed91965f7e43c512b26a609b Parents: 7c436d3 Author: huxi <[email protected]> Authored: Wed Apr 19 09:53:13 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Apr 19 09:53:13 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsumerPerformance.scala | 18 ++++++- .../src/main/scala/kafka/utils/ToolsUtils.scala | 32 ++++++++++++ .../apache/kafka/tools/ProducerPerformance.java | 20 +++++-- .../java/org/apache/kafka/tools/ToolsUtils.java | 55 ++++++++++++++++++++ 4 files changed, 119 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index b7087f2..a5d4d5d 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -27,8 +27,8 @@ import org.apache.log4j.Logger import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.TopicPartition -import kafka.utils.CommandLineUtils +import org.apache.kafka.common.{Metric, MetricName, TopicPartition} +import kafka.utils.{CommandLineUtils, ToolsUtils} import java.util.{Collections, Properties, Random} import kafka.consumer.Consumer @@ -38,6 +38,8 @@ import kafka.consumer.ConsumerTimeoutException import java.text.SimpleDateFormat import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable + /** * Performance test for the full zookeeper consumer */ @@ -51,6 +53,7 @@ object ConsumerPerformance { val totalMessagesRead = new AtomicLong(0) val totalBytesRead = new AtomicLong(0) val consumerTimeout = new AtomicBoolean(false) + var metrics: mutable.Map[MetricName, _ <: Metric] = null if (!config.hideHeader) { if (!config.showDetailedStats) @@ -66,6 +69,10 @@ object ConsumerPerformance { startMs = System.currentTimeMillis consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead) endMs = System.currentTimeMillis + + if (config.printMetrics) { + metrics = consumer.metrics().asScala + } consumer.close() } else { import kafka.consumer.ConsumerConfig @@ -96,6 +103,11 @@ object ConsumerPerformance { println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs)) } + + if (metrics != null) { + ToolsUtils.printMetrics(metrics) + } + } def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) { @@ -210,12 +222,14 @@ object ConsumerPerformance { .withRequiredArg .describedAs("config file") .ofType(classOf[String]) + val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics. This only applies to new consumer.") val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) val useOldConsumer = options.has(zkConnectOpt) + val printMetrics = options.has(printMetricsOpt) val props = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/core/src/main/scala/kafka/utils/ToolsUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala index 65758d8..1be5a45 100644 --- a/core/src/main/scala/kafka/utils/ToolsUtils.scala +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -16,7 +16,14 @@ */ package kafka.utils +import java.util +import java.util.Comparator + import joptsimple.OptionParser +import org.apache.kafka.common.{Metric, MetricName} + +import scala.collection.immutable.ListMap +import scala.collection.mutable object ToolsUtils { @@ -33,4 +40,29 @@ object ToolsUtils { if(!isValid) CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ") } + + /** + * print out the metrics in alphabetical order + * @param metrics the metrics to be printed out + */ + def printMetrics(metrics: mutable.Map[MetricName, _ <: Metric]): Unit = { + var maxLengthOfDisplayName = 0 + + val sortedMap = metrics.toSeq.sortWith( (s,t) => + Array(s._1.group(), s._1.name(), s._1.tags()).mkString(":") + .compareTo(Array(t._1.group(), t._1.name(), t._1.tags()).mkString(":")) < 0 + ).map { + case (key, value) => + val mergedKeyName = Array(key.group(), key.name(), key.tags()).mkString(":") + if (maxLengthOfDisplayName < mergedKeyName.length) { + maxLengthOfDisplayName = mergedKeyName.length + } + (mergedKeyName, value.value()) + } + println(s"\n%-${maxLengthOfDisplayName}s %s".format("Metric Name", "Value")) + sortedMap.foreach { + case (metricName, value) => + println(s"%-${maxLengthOfDisplayName}s : %.3f".format(metricName, value)) + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index e96814d..b861b9d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -17,17 +17,17 @@ package org.apache.kafka.tools; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; - import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Random; +import java.util.Arrays; import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; import org.apache.kafka.clients.producer.Callback; @@ -59,6 +59,7 @@ public class ProducerPerformance { List<String> producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); String payloadFilePath = res.getString("payloadFile"); + boolean shouldPrintMetrics = res.getBoolean("printMetrics"); // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter"); @@ -127,10 +128,14 @@ public class ProducerPerformance { throttler.throttle(); } } - /* print final results */ - producer.close(); stats.printTotal(); + + /* print out metrics */ + if (shouldPrintMetrics) { + ToolsUtils.printMetrics(producer.metrics()); + } + producer.close(); } catch (ArgumentParserException e) { if (args.length == 0) { parser.printHelp(); @@ -223,6 +228,13 @@ public class ProducerPerformance { .dest("producerConfigFile") .help("producer config properties file."); + parser.addArgument("--print-metrics") + .action(storeTrue()) + .type(Boolean.class) + .metavar("PRINT-METRICS") + .dest("printMetrics") + .help("print out metrics at the end of the test."); + return parser; } http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java new file mode 100644 index 0000000..e02bbb0 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; + +public class ToolsUtils { + + /** + * print out the metrics in alphabetical order + * @param metrics the metrics to be printed out + */ + public static void printMetrics(Map<MetricName, ? extends Metric> metrics) { + if (metrics != null && !metrics.isEmpty()) { + int maxLengthOfDisplayName = 0; + TreeMap<String, Double> sortedMetrics = new TreeMap<>(new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }); + for (Metric metric : metrics.values()) { + MetricName mName = metric.metricName(); + String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags(); + maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length() ? mergedName.length() : maxLengthOfDisplayName; + sortedMetrics.put(mergedName, metric.value()); + } + String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f"; + System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s %s", "Metric Name", "Value")); + + for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) { + System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue())); + } + } + } +}
