[ 
https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410573#comment-16410573
 ] 

ASF GitHub Bot commented on KAFKA-6611:
---------------------------------------

guozhangwang closed pull request #4650: KAFKA-6611: PART I, Use JMXTool in 
SimpleBenchmark
URL: https://github.com/apache/kafka/pull/4650
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala 
b/core/src/main/scala/kafka/tools/JmxTool.scala
index 4a6a348d9a6..27e46319e49 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -55,12 +55,17 @@ object JmxTool extends Logging {
         .withRequiredArg
         .describedAs("name")
         .ofType(classOf[String])
-    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval 
in MS with which to poll jmx stats.")
+    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval 
in MS with which to poll jmx stats; default value is 2 seconds. " +
+      "Value of -1 equivalent to setting one-time to true")
       .withRequiredArg
       .describedAs("ms")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(2000)
-    val helpOpt = parser.accepts("help", "Print usage information.")
+    val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once 
only.")
+      .withRequiredArg
+      .describedAs("one-time")
+      .ofType(classOf[java.lang.Boolean])
+      .defaultsTo(false)
     val dateFormatOpt = parser.accepts("date-format", "The date format to use 
for formatting the time field. " +
       "See java.text.SimpleDateFormat for options.")
       .withRequiredArg
@@ -72,8 +77,15 @@ object JmxTool extends Logging {
         .describedAs("service-url")
         .ofType(classOf[String])
         .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+    val reportFormatOpt = parser.accepts("report-format", "output format name: 
either 'original', 'properties', 'csv', 'tsv' ")
+      .withRequiredArg
+      .describedAs("report-format")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("original")
     val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to 
become available before starting output. " +
       "Only supported when the list of objects is non-empty and contains no 
object name patterns.")
+    val helpOpt = parser.accepts("help", "Print usage information.")
+
 
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard 
output.")
@@ -87,12 +99,16 @@ object JmxTool extends Logging {
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
+    var oneTime = interval < 0 || options.has(oneTimeOpt)
     val attributesWhitelistExists = options.has(attributesOpt)
-    val attributesWhitelist = if(attributesWhitelistExists) 
Some(options.valueOf(attributesOpt).split(",")) else None
+    val attributesWhitelist = if(attributesWhitelistExists) 
Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else 
None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new 
SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
     val wait = options.has(waitOpt)
 
+    val reportFormat = 
parseFormat(options.valueOf(reportFormatOpt).toLowerCase)
+    val reportFormatOriginal = reportFormat.equals("original")
+
     var jmxc: JMXConnector = null
     var mbsc: MBeanServerConnection = null
     var connected = false
@@ -150,33 +166,57 @@ object JmxTool extends Logging {
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       if (attributesWhitelistExists)
-        queries.map((_, attributesWhitelist.get.size)).toMap
+        queries.map((_, attributesWhitelist.get.length)).toMap
       else {
         names.map{(name: ObjectName) =>
           val mbean = mbsc.getMBeanInfo(name)
           (name, mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName)).size)}.toMap
       }
 
+    if(numExpectedAttributes.isEmpty) {
+      CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for 
the queried objects $queries.")
+    }
+
     // print csv header
     val keys = List("time") ++ queryAttributes(mbsc, names, 
attributesWhitelist).keys.toArray.sorted
-    if(keys.size == numExpectedAttributes.values.sum + 1)
+    if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 
1) {
       println(keys.map("\"" + _ + "\"").mkString(","))
+    }
 
-    while(true) {
+    var keepGoing = true
+    while (keepGoing) {
       val start = System.currentTimeMillis
       val attributes = queryAttributes(mbsc, names, attributesWhitelist)
       attributes("time") = dateFormat match {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
-        println(keys.map(attributes(_)).mkString(","))
-      val sleep = max(0, interval - (System.currentTimeMillis - start))
-      Thread.sleep(sleep)
+      if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) {
+        if(reportFormatOriginal) {
+          println(keys.map(attributes(_)).mkString(","))
+        }
+        else if(reportFormat.equals("properties")) {
+          keys.foreach( k => { println(k + "=" + attributes(k) ) } )
+        }
+        else if(reportFormat.equals("csv")) {
+          keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } )
+        }
+        else { // tsv
+          keys.foreach( k => { println(k + "\t" + attributes(k) ) } )
+        }
+      }
+
+      if (oneTime) {
+        keepGoing = false
+      }
+      else {
+        val sleep = max(0, interval - (System.currentTimeMillis - start))
+        Thread.sleep(sleep)
+      }
     }
   }
 
-  def queryAttributes(mbsc: MBeanServerConnection, names: 
Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
+  def queryAttributes(mbsc: MBeanServerConnection, names: 
Iterable[ObjectName], attributesWhitelist: Option[Array[String]]): 
mutable.Map[String, Any] = {
     val attributes = new mutable.HashMap[String, Any]()
     for (name <- names) {
       val mbean = mbsc.getMBeanInfo(name)
@@ -193,4 +233,10 @@ object JmxTool extends Logging {
     attributes
   }
 
+  def parseFormat(reportFormatOpt : String): String = reportFormatOpt match {
+    case "properties" => "properties"
+    case "csv" => "csv"
+    case "tsv" => "tsv"
+    case _ => "original"
+  }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 5d7041ee1c5..c66d78b7310 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -231,6 +231,7 @@ public static void main(String[] args) throws IOException {
 
     public void setStreamProperties(final String applicationId) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git 
a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py 
b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 4cc39763074..06aec1448dd 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -35,7 +35,7 @@ def __init__(self, test_context):
         self.num_threads = 1
 
     @cluster(num_nodes=9)
-    @matrix(test=["produce", "consume", "count", "processstream", 
"processstreamwithsink", "processstreamwithstatestore", 
"processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", 
"ktablektablejoin", "yahoo"], scale=[1, 3])
+    @matrix(test=["count", "processstream", "processstreamwithsink", 
"processstreamwithstatestore", "processstreamwithcachedstatestore", 
"kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
@@ -75,6 +75,8 @@ def test_simple_benchmark(self, test, scale):
         self.load_driver.wait()
         self.load_driver.stop()
 
+
+
         ################
         # RUN PHASE
         ################
@@ -93,11 +95,18 @@ def test_simple_benchmark(self, test, scale):
             node[num] = self.driver[num].node
             node[num].account.ssh("grep Performance %s" % 
self.driver[num].STDOUT_FILE, allow_fail=False)
             data[num] = self.driver[num].collect_data(node[num], "" )
-                
+            self.driver[num].read_jmx_output_all_nodes()
+
 
         final = {}
         for num in range(0, scale):
             for key in data[num]:
                 final[key + str(num)] = data[num][key]
-        
+
+            for key in sorted(self.driver[num].jmx_stats[0]):
+                self.logger.info("%s: %s" % (key, 
self.driver[num].jmx_stats[0][key]))
+
+            final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value
+            final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value
+
         return final
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 950ded31cf7..64a99f938e6 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -120,7 +120,7 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-gro
             print_timestamp             if True, print each message's 
timestamp as well
             isolation_level             How to handle transactional messages.
         """
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[],
+        JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
         BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index c4d4b247557..ba5abc719f6 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -34,7 +34,6 @@
 
 Port = collections.namedtuple('Port', ['name', 'number', 'open'])
 
-
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"server-start-stdout-stderr.log")
@@ -72,14 +71,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, 
jmx_object_names=None,
-                 jmx_attributes=None, zk_connect_timeout=5000, 
zk_session_timeout=6000, server_prop_overides=[], zk_chroot=None):
+                 jmx_attributes=None, zk_connect_timeout=5000, 
zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None):
         """
         :type context
         :type zk: ZookeeperService
         :type topics: dict
         """
         Service.__init__(self, context, num_nodes)
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[],
+        JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=KafkaService.PERSISTENT_ROOT)
 
         self.zk = zk
@@ -92,7 +91,10 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
         self.zk_set_acl = False
-        self.server_prop_overides = server_prop_overides
+        if server_prop_overides is None:
+            self.server_prop_overides = []
+        else:
+            self.server_prop_overides = server_prop_overides
         self.log_level = "DEBUG"
         self.zk_chroot = zk_chroot
 
diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 6f6e2219989..542d3a55052 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -70,14 +70,14 @@ def check_jmx_port_listening():
         use_jmxtool_version = get_version(node)
         if use_jmxtool_version <= V_0_11_0_0:
             use_jmxtool_version = DEV_BRANCH
-        cmd = "%s %s " % (self.path.script("kafka-run-class.sh", 
use_jmxtool_version),
-                          self.jmx_class_name())
+        cmd = "%s %s " % (self.path.script("kafka-run-class.sh", 
use_jmxtool_version), self.jmx_class_name())
         cmd += "--reporting-interval 1000 --jmx-url 
service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
         cmd += " --wait"
         for jmx_object_name in self.jmx_object_names:
             cmd += " --object-name %s" % jmx_object_name
+        cmd += " --attributes "
         for jmx_attribute in self.jmx_attributes:
-            cmd += " --attributes %s" % jmx_attribute
+            cmd += "%s," % jmx_attribute
         cmd += " 1>> %s" % self.jmx_tool_log
         cmd += " 2>> %s &" % self.jmx_tool_err_log
 
diff --git a/tests/kafkatest/services/performance/streams_performance.py 
b/tests/kafkatest/services/performance/streams_performance.py
index 94f72499a7e..9f791811c55 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.streams import StreamsTestBaseService
 
-
 #
 # Class used to start the simple Kafka Streams benchmark
 #
@@ -31,6 +31,55 @@ def __init__(self, test_context, kafka, numrecs, load_phase, 
test_name, num_thre
                                                             test_name,
                                                             num_threads)
 
+        self.load_phase = load_phase
+
+        if self.load_phase == "false":
+            JmxMixin.__init__(self,
+                              num_nodes=1,
+                              
jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d'
 %(i+1) for i in range(num_threads)],
+                              jmx_attributes=['process-latency-avg',
+                                              'process-rate',
+                                              'commit-latency-avg',
+                                              'commit-rate',
+                                              'poll-latency-avg',
+                                              'poll-rate'],
+                              root=StreamsTestBaseService.PERSISTENT_ROOT)
+
+    def start_cmd(self, node):
+        cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
+
+        if self.load_phase == "false":
+            args = self.args.copy()
+            args['jmx_port'] = self.jmx_port
+            args['kafka'] = self.kafka.bootstrap_servers()
+            args['config_file'] = self.CONFIG_FILE
+            args['stdout'] = self.STDOUT_FILE
+            args['stderr'] = self.STDERR_FILE
+            args['pidfile'] = self.PID_FILE
+            args['log4j'] = self.LOG4J_CONFIG_FILE
+            args['kafka_run_class'] = self.path.script("kafka-run-class.sh", 
node)
+
+            cmd = "( export JMX_PORT=%(jmx_port)s; export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+                  "INCLUDE_TEST_JARS=true %(kafka_run_class)s 
%(streams_class_name)s " \
+                  " %(kafka)s %(config_file)s %(user_test_args)s 
%(user_test_args1)s %(user_test_args2)s" \
+                  " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
+
+        self.logger.info("Executing streams simple benchmark cmd: " + cmd)
+
+        return cmd
+
+    def start_node(self, node):
+        super(StreamsSimpleBenchmarkService, self).start_node(node)
+
+        if self.load_phase == "false":
+            self.start_jmx_tool(1, node)
+
+
+    def clean_node(self, node):
+        if self.load_phase == "false":
+            JmxMixin.clean_node(self, node)
+        super(StreamsSimpleBenchmarkService, self).clean_node(node)
+
     def collect_data(self, node, tag = None):
         # Collect the data and return it to the framework
         output = node.account.ssh_capture("grep Performance %s" % 
self.STDOUT_FILE)
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 6da5a25eb7e..d9b475e191b 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -19,13 +19,12 @@
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.kafka import KafkaConfig
 
-
 STATE_DIR = "state.dir"
 
-class StreamsTestBaseService(KafkaPathResolverMixin, Service):
-
+class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     """Base class for Streams Test services providing some common settings and 
functionality"""
 
     PERSISTENT_ROOT = "/mnt/streams"
@@ -35,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
     LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
     STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+    JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
+    JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
@@ -48,10 +49,16 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "jmx_log": {
+            "path": JMX_LOG_FILE,
+            "collect_default": True},
+        "jmx_err": {
+            "path": JMX_ERR_FILE,
+            "collect_default": True},
     }
 
     def __init__(self, test_context, kafka, streams_class_name, 
user_test_args, user_test_args1=None, user_test_args2=None, 
user_test_args3=None):
-        super(StreamsTestBaseService, self).__init__(test_context, 1)
+        Service.__init__(self, test_context, num_nodes=1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
                      'user_test_args': user_test_args,
@@ -130,7 +137,7 @@ def start_cmd(self, node):
               " %(kafka)s %(config_file)s %(user_test_args)s 
%(user_test_args1)s %(user_test_args2)s" \
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> 
%(stderr)s 3> %(pidfile)s" % args
 
-        self.logger.info("Executing Streams cmd: " + cmd)
+        self.logger.info("Executing streams cmd: " + cmd)
 
         return cmd
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Re-write simple benchmark in system tests with JMXTool
> ------------------------------------------------------
>
>                 Key: KAFKA-6611
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6611
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> The current SimpleBenchmark is recording the num.records actively in order to 
> compute throughput and latency, which introduces extra cost plus is less 
> accurate for benchmarking purposes; instead, it's better to use JmxMixin with 
> SimpleBenchmark to record metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to