This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72a5aa8  MINOR: add wait_for_assigned_partitions to console-consumer 
(#8192)
72a5aa8 is described below

commit 72a5aa8b0758f23b42c6e4453d6dc6c4861aa1ad
Author: Brian Bushree <[email protected]>
AuthorDate: Sat Feb 29 16:43:51 2020 -0800

    MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
    
    what/why
    the throttling_test was broken by this PR (#7785) since it depends on the 
consumer having partitions-assigned before starting the producer
    
    this PR provides the ability to wait for partitions to be assigned in the 
console consumer before considering it started.
    
    caveat
    this does not support starting up the JmxTool inside the console-consumer 
for custom metrics while using this wait_until_partitions_assigned flag since 
the code assumes one JmxTool running per node.
    
    I think a proper fix for this would be to make JmxTool its own standalone 
single-node service
    
    alternatives
    we could use the EndToEnd test suite which uses the verifiable 
producer/consumer under the hood but I found that there were more changes 
necessary to get this working unfortunately (specifically doesn't seem like 
this test suite plays nicely with the ProducerPerformanceService)
    
    Reviewers: Mathew Wong <[email protected]>, Bill Bejeck <bbejeck.com>
---
 tests/kafkatest/services/console_consumer.py       | 28 ++++++++++++++++++++--
 tests/kafkatest/services/monitor/jmx.py            | 19 +++++++++++++--
 .../tests/core/fetch_from_follower_test.py         | 16 +------------
 tests/kafkatest/tests/core/throttling_test.py      |  3 ++-
 4 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 0811bcd..d349ed7 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,9 +17,10 @@ import itertools
 import os
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
 from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
 
 """
@@ -62,7 +63,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
                  client_id="console-consumer", print_key=False, 
jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=35, 
print_timestamp=False, print_partition=False,
                  isolation_level="read_uncommitted", 
jaas_override_variables=None,
-                 kafka_opts_override="", client_prop_file_override="", 
consumer_properties={}):
+                 kafka_opts_override="", client_prop_file_override="", 
consumer_properties={},
+                 wait_until_partitions_assigned=False):
         """
         Args:
             context:                    standard context
@@ -124,6 +126,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
         self.kafka_opts_override = kafka_opts_override
         self.client_prop_file_override = client_prop_file_override
         self.consumer_properties = consumer_properties
+        self.wait_until_partitions_assigned = wait_until_partitions_assigned
 
 
     def prop_file(self, node):
@@ -273,8 +276,29 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
         with self.lock:
             self.read_jmx_output(idx, node)
 
+    def _wait_until_partitions_assigned(self, node, timeout_sec=60):
+        if self.jmx_object_names is not None:
+            raise Exception("'wait_until_partitions_assigned' is not supported 
while using 'jmx_object_names'/'jmx_attributes'")
+        jmx_tool = JmxTool(self.context, jmx_poll_ms=100)
+        jmx_tool.jmx_object_names = 
["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % 
self.client_id]
+        jmx_tool.jmx_attributes = ["assigned-partitions"]
+        jmx_tool.assigned_partitions_jmx_attr = 
"kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
 % self.client_id
+        jmx_tool.start_jmx_tool(self.idx(node), node)
+        assigned_partitions_jmx_attr = 
"kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
 % self.client_id
+
+        def read_and_check():
+            jmx_tool.read_jmx_output(self.idx(node), node)
+            return assigned_partitions_jmx_attr in jmx_tool.maximum_jmx_value
+
+        wait_until(lambda: read_and_check(),
+                timeout_sec=timeout_sec,
+                backoff_sec=.5,
+                err_msg="consumer was not assigned partitions within %d 
seconds" % timeout_sec)
+
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)
+        if self.wait_until_partitions_assigned:
+            self._wait_until_partitions_assigned(node)
 
     def stop_node(self, node):
         self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(node.account)))
diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index c5b747d..2dcd369 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -17,6 +17,8 @@ import os
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
 
 class JmxMixin(object):
@@ -41,10 +43,11 @@ class JmxMixin(object):
         self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
         self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")
 
-    def clean_node(self, node):
+    def clean_node(self, node, idx=None):
         node.account.kill_java_processes(self.jmx_class_name(), 
clean_shutdown=False,
                                          allow_fail=True)
-        idx = self.idx(node)
+        if idx is None:
+            idx = self.idx(node)
         self.started[idx-1] = False
         node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, 
self.jmx_tool_err_log), allow_fail=False)
 
@@ -139,3 +142,15 @@ class JmxMixin(object):
 
     def jmx_class_name(self):
         return "kafka.tools.JmxTool"
+
+class JmxTool(JmxMixin, KafkaPathResolverMixin):
+    """
+    Simple helper class for using the JmxTool directly instead of as a mix-in
+    """
+    def __init__(self, text_context, *args, **kwargs):
+        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
+        self.context = text_context
+
+    @property
+    def logger(self):
+        return self.context.logger
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py 
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index fde1baf..ef37728 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -18,29 +18,15 @@ from collections import defaultdict
 
 from ducktape.mark.resource import cluster
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxTool
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 
 
-class JmxTool(JmxMixin, KafkaPathResolverMixin):
-    """
-    Simple helper class for using the JmxTool directly instead of as a mix-in
-    """
-    def __init__(self, text_context, *args, **kwargs):
-        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
-        self.context = text_context
-
-    @property
-    def logger(self):
-        return self.context.logger
-
-
 class FetchFromFollowerTest(ProduceConsumeValidateTest):
 
     RACK_AWARE_REPLICA_SELECTOR = 
"org.apache.kafka.common.replica.RackAwareReplicaSelector"
diff --git a/tests/kafkatest/tests/core/throttling_test.py 
b/tests/kafkatest/tests/core/throttling_test.py
index 4a8327e..f29ec2b 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -165,7 +165,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                                         self.topic,
                                         consumer_timeout_ms=60000,
                                         message_validator=is_int,
-                                        from_beginning=False)
+                                        from_beginning=False,
+                                        wait_until_partitions_assigned=True)
 
         self.kafka.start()
         bulk_producer.run()

Reply via email to