This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64b63f472f0 KAFKA-19316: added share_group_command_test.py system
tests (#19774)
64b63f472f0 is described below
commit 64b63f472f0382761025944105b71b2e2e356d88
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Thu May 29 15:29:32 2025 +0530
KAFKA-19316: added share_group_command_test.py system tests (#19774)
This PR include system tests in the file share_group_command_test.py.
These tests test the functionality of kafka-share-groups.sh tool
Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
<[email protected]>
---
tests/kafkatest/services/console_share_consumer.py | 2 +-
tests/kafkatest/services/kafka/kafka.py | 77 ++++++++++++-
.../templates/console_share_consumer.properties | 2 +-
.../tests/core/share_group_command_test.py | 128 +++++++++++++++++++++
4 files changed, 206 insertions(+), 3 deletions(-)
diff --git a/tests/kafkatest/services/console_share_consumer.py
b/tests/kafkatest/services/console_share_consumer.py
index 4136511c574..03fbaeaf5a5 100644
--- a/tests/kafkatest/services/console_share_consumer.py
+++ b/tests/kafkatest/services/console_share_consumer.py
@@ -57,7 +57,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadSer
"collect_default": False}
}
- def __init__(self, context, num_nodes, kafka, topic,
group_id="test-share-consumer-group",
+ def __init__(self, context, num_nodes, kafka, topic,
group_id="test-share-group",
message_validator=None, share_consumer_timeout_ms=None,
version=DEV_BRANCH,
client_id="console-share-consumer", print_key=False,
jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=35,
print_timestamp=False, print_partition=False,
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index ca19ca8bd11..b490f75fbac 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1749,6 +1749,27 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
if type is not None:
cmd += " --type %s" % type
return self.run_cli_tool(node, cmd)
+
+ def list_share_groups(self, node=None, command_config=None, state=None):
+ """ Get list of share groups.
+ """
+ if node is None:
+ node = self.nodes[0]
+ share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+ if command_config is None:
+ command_config = ""
+ else:
+ command_config = "--command-config " + command_config
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --bootstrap-server %s %s --list" % \
+ (share_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ command_config)
+ if state is not None:
+ cmd += " --state %s" % state
+ return self.run_cli_tool(node, cmd)
def describe_consumer_group(self, group, node=None, command_config=None):
""" Describe a consumer group.
@@ -1771,10 +1792,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
- if not (line.startswith("SLF4J") or line.startswith("TOPIC") or
line.startswith("Could not fetch offset")):
+ if not (line.startswith("SLF4J") or line.startswith("GROUP") or
line.startswith("Could not fetch offset")):
output += line
self.logger.debug(output)
return output
+
+ def describe_share_group(self, group, node=None, command_config=None):
+ """ Describe a share group.
+ """
+ if node is None:
+ node = self.nodes[0]
+ share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+ if command_config is None:
+ command_config = ""
+ else:
+ command_config = "--command-config " + command_config
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+ (share_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ command_config, group)
+
+ output = ""
+ self.logger.debug(cmd)
+ for line in node.account.ssh_capture(cmd):
+ if not (line.startswith("SLF4J") or line.startswith("GROUP") or
line.startswith("Could not fetch offset")):
+ output += line
+ self.logger.debug(output)
+ return output
+
+ def describe_share_group_members(self, group, node=None,
command_config=None):
+ """ Describe members of a share group.
+ """
+ if node is None:
+ node = self.nodes[0]
+ share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+ if command_config is None:
+ command_config = ""
+ else:
+ command_config = "--command-config " + command_config
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+ (share_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ command_config, group)
+
+ cmd += " --members"
+
+ output_lines = []
+ self.logger.debug(cmd)
+ for line in node.account.ssh_capture(cmd):
+ if not (line.startswith("SLF4J") or line.startswith("GROUP") or
line.strip() == ""):
+ output_lines.append(line.strip())
+ self.logger.debug(output_lines)
+ return output_lines
def describe_quorum(self, node=None):
"""Run the describe quorum command.
diff --git
a/tests/kafkatest/services/templates/console_share_consumer.properties
b/tests/kafkatest/services/templates/console_share_consumer.properties
index da9fa4e6664..fccbf980f43 100644
--- a/tests/kafkatest/services/templates/console_share_consumer.properties
+++ b/tests/kafkatest/services/templates/console_share_consumer.properties
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-group.id={{ group_id|default('test-share-consumer-group') }}
+group.id={{ group_id|default('test-share-group') }}
{% if client_id is defined and client_id is not none %}
client.id={{ client_id }}
diff --git a/tests/kafkatest/tests/core/share_group_command_test.py
b/tests/kafkatest/tests/core/share_group_command_test.py
new file mode 100644
index 00000000000..83e83ba6449
--- /dev/null
+++ b/tests/kafkatest/tests/core/share_group_command_test.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.console_share_consumer import ConsoleShareConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+import os
+import re
+
+TOPIC = "topic-share-group-command"
+
+
+class ShareGroupCommandTest(Test):
+ """
+ Tests ShareGroupCommand
+ """
+ # Root directory for persistent output
+ PERSISTENT_ROOT = "/mnt/share_group_command"
+ COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
+
+ def __init__(self, test_context):
+ super(ShareGroupCommandTest, self).__init__(test_context)
+ self.num_brokers = 1
+ self.topics = {
+ TOPIC: {'partitions': 1, 'replication-factor': 1}
+ }
+
+ def start_kafka(self, security_protocol, interbroker_security_protocol):
+ self.kafka = KafkaService(
+ self.test_context, self.num_brokers,
+ None, security_protocol=security_protocol,
+ interbroker_security_protocol=interbroker_security_protocol,
topics=self.topics,
+ controller_num_nodes_override=self.num_brokers)
+ self.kafka.start()
+
+ def start_share_consumer(self):
+ self.share_consumer = ConsoleShareConsumer(self.test_context,
num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+ share_consumer_timeout_ms=None)
+ self.share_consumer.start()
+
+ def setup_and_verify(self, security_protocol, group=None,
describe_members=False):
+ self.start_kafka(security_protocol, security_protocol)
+ self.start_share_consumer()
+ share_consumer_node = self.share_consumer.nodes[0]
+ wait_until(lambda: self.share_consumer.alive(share_consumer_node),
+ timeout_sec=20, backoff_sec=.2, err_msg="Share consumer was
too slow to start")
+ kafka_node = self.kafka.nodes[0]
+ if security_protocol is not SecurityConfig.PLAINTEXT:
+ prop_file = str(self.kafka.security_config.client_config())
+ self.logger.debug(prop_file)
+ kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT,
allow_fail=False)
+ kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
+
+ # Verify ShareConsumerGroupCommand lists expected consumer groups
+ command_config_file = self.COMMAND_CONFIG_FILE
+
+ if group:
+ if describe_members:
+ def has_expected_share_group_member():
+ output =
self.kafka.describe_share_group_members(group=group, node=kafka_node,
command_config=command_config_file)
+ return len(output) == 1 and all("test-share-group" in line
for line in output)
+ wait_until(has_expected_share_group_member, timeout_sec=10,
err_msg="Timed out waiting to describe members of the share group.")
+ else:
+ wait_until(lambda:
re.search("topic-share-group-command",self.kafka.describe_share_group(group=group,
node=kafka_node, command_config=command_config_file)), timeout_sec=10,
+ err_msg="Timed out waiting to describe expected share
groups.")
+ else:
+ wait_until(lambda: "test-share-group" in
self.kafka.list_share_groups(node=kafka_node,
command_config=command_config_file), timeout_sec=10,
+ err_msg="Timed out waiting to list expected share
groups.")
+
+ self.share_consumer.stop()
+
+ @cluster(num_nodes=3)
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True]
+ )
+ def test_list_share_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+ """
+ Tests if ShareGroupCommand is listing correct share groups
+ :return: None
+ """
+ self.setup_and_verify(security_protocol)
+
+ @cluster(num_nodes=3)
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_describe_share_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+ """
+ Tests if ShareGroupCommand is describing a share group correctly
+ :return: None
+ """
+ self.setup_and_verify(security_protocol, group="test-share-group")
+
+ @cluster(num_nodes=3)
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_describe_share_group_members(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+ """
+ Tests if ShareGroupCommand is describing the members of a share group
correctly
+ :return: None
+ """
+ self.setup_and_verify(security_protocol, group="test-share-group",
describe_members=True)