This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 0eb09d3  KAFKA-13234; Transaction system test should clear URPs after 
broker restarts (#11267)
0eb09d3 is described below

commit 0eb09d307757cbce42bfe2bb96451b3d5909ede3
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Sep 1 08:37:05 2021 -0700

    KAFKA-13234; Transaction system test should clear URPs after broker 
restarts (#11267)
    
    Clearing under-replicated-partitions helps ensure that partitions do not 
become unavailable longer than necessary as brokers are rolled. This prevents 
flakiness due to transaction timeouts.
    
    Reviewers: Luke Chen <[email protected]>, Ismael Juma <[email protected]>
---
 tests/docker/ducker-ak                             |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 43 ++++++++++++++++++++++
 .../tests/core/group_mode_transactions_test.py     |  2 +
 tests/kafkatest/tests/core/transactions_test.py    |  2 +
 4 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak
index 15c4b7c..8047b8f 100755
--- a/tests/docker/ducker-ak
+++ b/tests/docker/ducker-ak
@@ -480,7 +480,7 @@ ducker_test() {
     (test -f ./gradlew || gradle) && ./gradlew systemTestLibs
     must_popd
     if [[ "${debug}" -eq 1 ]]; then
-        local ducktape_cmd="python3.7 -m debugpy --listen 
0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
+        local ducktape_cmd="python3 -m debugpy --listen 
0.0.0.0:${debugpy_port} --wait-for-client /usr/local/bin/ducktape"
     else
         local ducktape_cmd="ducktape"
     fi
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 9901362..e057015 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1125,6 +1125,49 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.info("Running topic delete command...\n%s" % cmd)
         node.account.ssh(cmd)
 
+    def has_under_replicated_partitions(self):
+        """
+        Check whether the cluster has under-replicated partitions.
+
+        :return True if there are under-replicated partitions, False otherwise.
+        """
+        return len(self.describe_under_replicated_partitions()) > 0
+
+    def await_no_under_replicated_partitions(self, timeout_sec=30):
+        """
+        Wait for all under-replicated partitions to clear.
+
+        :param timeout_sec: the maximum time in seconds to wait
+        """
+        wait_until(lambda: not self.has_under_replicated_partitions(),
+                   timeout_sec = timeout_sec,
+                   err_msg="Timed out waiting for under-replicated-partitions 
to clear")
+
+    def describe_under_replicated_partitions(self):
+        """
+        Use the topic tool to find the under-replicated partitions in the 
cluster.
+
+        :return the under-replicated partitions as a list of dictionaries
+                (e.g. [{"topic": "foo", "partition": 1}, {"topic": "bar", 
"partition": 0}, ... ])
+        """
+
+        node = self.nodes[0]
+        force_use_zk_connection = not 
node.version.topic_command_supports_bootstrap_server()
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --describe --under-replicated-partitions" % \
+            self.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection)
+
+        self.logger.debug("Running topic command to describe under-replicated 
partitions\n%s" % cmd)
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        under_replicated_partitions = 
self.parse_describe_topic(output)["partitions"]
+        self.logger.debug("Found %d under-replicated-partitions" % 
len(under_replicated_partitions))
+
+        return under_replicated_partitions
+
     def describe_topic(self, topic, node=None):
         if node is None:
             node = self.nodes[0]
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py 
b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index 3b3fd1f..bb2749e 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -110,6 +110,8 @@ class GroupModeTransactionsTest(Test):
                     time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
+            self.kafka.await_no_under_replicated_partitions()
+
     def create_and_start_message_copier(self, input_topic, output_topic, 
transactional_id):
         message_copier = TransactionalMessageCopier(
             context=self.test_context,
diff --git a/tests/kafkatest/tests/core/transactions_test.py 
b/tests/kafkatest/tests/core/transactions_test.py
index cfb3b55..1f565b5 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -108,6 +108,8 @@ class TransactionsTest(Test):
                     time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
+            self.kafka.await_no_under_replicated_partitions()
+
     def create_and_start_message_copier(self, input_topic, input_partition, 
output_topic, transactional_id, use_group_metadata):
         message_copier = TransactionalMessageCopier(
             context=self.test_context,

Reply via email to