This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0eb09d3 KAFKA-13234; Transaction system test should clear URPs after
broker restarts (#11267)
0eb09d3 is described below
commit 0eb09d307757cbce42bfe2bb96451b3d5909ede3
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Sep 1 08:37:05 2021 -0700
KAFKA-13234; Transaction system test should clear URPs after broker
restarts (#11267)
Clearing under-replicated-partitions helps ensure that partitions do not
become unavailable longer than necessary as brokers are rolled. This prevents
flakiness due to transaction timeouts.
Reviewers: Luke Chen <[email protected]>, Ismael Juma <[email protected]>
---
tests/docker/ducker-ak | 2 +-
tests/kafkatest/services/kafka/kafka.py | 43 ++++++++++++++++++++++
.../tests/core/group_mode_transactions_test.py | 2 +
tests/kafkatest/tests/core/transactions_test.py | 2 +
4 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak
index 15c4b7c..8047b8f 100755
--- a/tests/docker/ducker-ak
+++ b/tests/docker/ducker-ak
@@ -480,7 +480,7 @@ ducker_test() {
(test -f ./gradlew || gradle) && ./gradlew systemTestLibs
must_popd
if [[ "${debug}" -eq 1 ]]; then
- local ducktape_cmd="python3.7 -m debugpy --listen
0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
+ local ducktape_cmd="python3 -m debugpy --listen
0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
else
local ducktape_cmd="ducktape"
fi
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 9901362..e057015 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1125,6 +1125,49 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.logger.info("Running topic delete command...\n%s" % cmd)
node.account.ssh(cmd)
+ def has_under_replicated_partitions(self):
+ """
+ Check whether the cluster has under-replicated partitions.
+
+ :return True if there are under-replicated partitions, False otherwise.
+ """
+ return len(self.describe_under_replicated_partitions()) > 0
+
+ def await_no_under_replicated_partitions(self, timeout_sec=30):
+ """
+ Wait for all under-replicated partitions to clear.
+
+ :param timeout_sec: the maximum time in seconds to wait
+ """
+ wait_until(lambda: not self.has_under_replicated_partitions(),
+ timeout_sec = timeout_sec,
+ err_msg="Timed out waiting for under-replicated-partitions
to clear")
+
+ def describe_under_replicated_partitions(self):
+ """
+ Use the topic tool to find the under-replicated partitions in the
cluster.
+
+ :return the under-replicated partitions as a list of dictionaries
+ (e.g. [{"topic": "foo", "partition": 1}, {"topic": "bar",
"partition": 0}, ... ])
+ """
+
+ node = self.nodes[0]
+ force_use_zk_connection = not
node.version.topic_command_supports_bootstrap_server()
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --describe --under-replicated-partitions" % \
+ self.kafka_topics_cmd_with_optional_security_settings(node,
force_use_zk_connection)
+
+ self.logger.debug("Running topic command to describe under-replicated
partitions\n%s" % cmd)
+ output = ""
+ for line in node.account.ssh_capture(cmd):
+ output += line
+
+ under_replicated_partitions =
self.parse_describe_topic(output)["partitions"]
+ self.logger.debug("Found %d under-replicated-partitions" %
len(under_replicated_partitions))
+
+ return under_replicated_partitions
+
def describe_topic(self, topic, node=None):
if node is None:
node = self.nodes[0]
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py
b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index 3b3fd1f..bb2749e 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -110,6 +110,8 @@ class GroupModeTransactionsTest(Test):
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)
+ self.kafka.await_no_under_replicated_partitions()
+
def create_and_start_message_copier(self, input_topic, output_topic,
transactional_id):
message_copier = TransactionalMessageCopier(
context=self.test_context,
diff --git a/tests/kafkatest/tests/core/transactions_test.py
b/tests/kafkatest/tests/core/transactions_test.py
index cfb3b55..1f565b5 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -108,6 +108,8 @@ class TransactionsTest(Test):
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)
+ self.kafka.await_no_under_replicated_partitions()
+
def create_and_start_message_copier(self, input_topic, input_partition,
output_topic, transactional_id, use_group_metadata):
message_copier = TransactionalMessageCopier(
context=self.test_context,