This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3430fef888 KAFKA-19072: Add system test for ELR (#19344)
e3430fef888 is described below

commit e3430fef888b504f2e5ddf35637a4057430d198f
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Apr 1 16:27:58 2025 -0700

    KAFKA-19072: Add system test for ELR (#19344)
    
    Verified locally.
    ```
    test_id:    
kafkatest.tests.core.eligible_leader_replicas_test.EligibleLeaderReplicasTest.test_basic_eligible_leader_replicas.metadata_quorum=ISOLATED_KRAFT.group_protocol=classic
    status:     PASS
    run time:   2 minutes 6.074 seconds
    
--------------------------------------------------------------------------------
    test_id:    
kafkatest.tests.core.eligible_leader_replicas_test.EligibleLeaderReplicasTest.test_basic_eligible_leader_replicas.metadata_quorum=ISOLATED_KRAFT.group_protocol=consumer
    status:     PASS
    run time:   2 minutes 6.611 seconds
    
--------------------------------------------------------------------------------
    ```
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../tests/core/eligible_leader_replicas_test.py    | 161 +++++++++++++++++++++
 1 file changed, 161 insertions(+)

diff --git a/tests/kafkatest/tests/core/eligible_leader_replicas_test.py 
b/tests/kafkatest/tests/core/eligible_leader_replicas_test.py
new file mode 100644
index 00000000000..64f730fd5a2
--- /dev/null
+++ b/tests/kafkatest/tests/core/eligible_leader_replicas_test.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.kafka import KafkaService, quorum, consumer_group
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.utils import is_int
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+import time
+
+class EligibleLeaderReplicasTest(Test):
+    """
+        Eligible leader replicas test verifies the ELR election can happen 
after all the replicas are offline.
+        The partition will first clean shutdown 2 replicas and unclean 
shutdown the leader. Then the two clean shutdown
+        replicas are started. One of these 2 replicas should be in ELR and 
become the leader.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(EligibleLeaderReplicasTest, 
self).__init__(test_context=test_context)
+
+        self.topic = "input-topic"
+
+        self.num_brokers = 6
+
+        # Test parameters
+        self.num_partitions = 1
+        self.num_seed_messages = 10000
+
+        self.progress_timeout_sec = 60
+        self.consumer_group = "elr-test-consumer-group"
+        self.broker_startup_timeout_sec = 120
+
+    def seed_messages(self, topic, num_seed_messages):
+        seed_timeout_sec = 10000
+        seed_producer = VerifiableProducer(context=self.test_context,
+                                           num_nodes=1,
+                                           kafka=self.kafka,
+                                           topic=topic,
+                                           message_validator=is_int,
+                                           max_messages=num_seed_messages,
+                                           enable_idempotence=True)
+        seed_producer.start()
+        wait_until(lambda: seed_producer.num_acked >= num_seed_messages,
+                   timeout_sec=seed_timeout_sec,
+                   err_msg="Producer failed to produce messages %d in %ds." % \
+                           (self.num_seed_messages, seed_timeout_sec))
+        return seed_producer.acked
+
+    def get_messages_from_topic(self, topic, num_messages, group_protocol):
+        consumer = self.start_consumer(topic, group_id="verifying_consumer", 
group_protocol=group_protocol)
+        return self.drain_consumer(consumer, num_messages)
+
+    def stop_broker(self, node, clean_shutdown):
+        if clean_shutdown:
+            self.kafka.stop_node(node, clean_shutdown = True, timeout_sec = 
self.broker_startup_timeout_sec)
+        else:
+            self.kafka.stop_node(node, clean_shutdown = False)
+            gracePeriodSecs = 5
+            brokerSessionTimeoutSecs = 18
+            wait_until(lambda: not self.kafka.pids(node),
+                       timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
+                       err_msg="Failed to see timely disappearance of process 
for hard-killed broker %s" % str(node.account))
+            time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
+
+    def start_consumer(self, topic_to_read, group_id, group_protocol):
+        consumer = ConsoleConsumer(context=self.test_context,
+                                   num_nodes=1,
+                                   kafka=self.kafka,
+                                   topic=topic_to_read,
+                                   group_id=group_id,
+                                   message_validator=is_int,
+                                   from_beginning=True,
+                                   isolation_level="read_committed",
+                                   
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
+        consumer.start()
+        # ensure that the consumer is up.
+        wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True,
+                   timeout_sec=180,
+                   err_msg="Consumer failed to consume any messages for %ds" % 
\
+                           60)
+        return consumer
+
+    def drain_consumer(self, consumer, num_messages):
+        # wait until we read at least the expected number of messages.
+        wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages,
+                   timeout_sec=90,
+                   err_msg="Consumer consumed only %d out of %d messages in 
%ds" % \
+                           (len(consumer.messages_consumed[1]), num_messages, 
90))
+        consumer.stop()
+        return consumer.messages_consumed[1]
+
+    def setup_topics(self):
+        self.kafka.topics = {
+            self.topic: {
+                "partitions": self.num_partitions,
+                "replication-factor": 3,
+                "configs": {
+                    "min.insync.replicas": 2
+                }
+            }
+        }
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=[quorum.isolated_kraft],
+            group_protocol=consumer_group.all_group_protocols)
+    def test_basic_eligible_leader_replicas(self, metadata_quorum, 
group_protocol=None):
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=self.num_brokers,
+                                  zk=None,
+                                  controller_num_nodes_override=1)
+        security_protocol = 'PLAINTEXT'
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.logs["kafka_data_1"]["collect_default"] = True
+        self.kafka.logs["kafka_data_2"]["collect_default"] = True
+        self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = 
True
+
+        self.setup_topics()
+        self.kafka.start(timeout_sec = self.broker_startup_timeout_sec)
+
+        self.kafka.run_features_command("upgrade", 
"eligible.leader.replicas.version", 1)
+        input_messages = self.seed_messages(self.topic, self.num_seed_messages)
+        isr = self.kafka.isr_idx_list(self.topic, 0)
+
+        self.stop_broker(self.kafka.nodes[isr[1] - 1], True)
+        self.stop_broker(self.kafka.nodes[isr[2] - 1], True)
+
+        wait_until(lambda: len(self.kafka.isr_idx_list(self.topic)) == 1,
+                   timeout_sec=60,
+                   err_msg="Timed out waiting for the partition to have only 1 
ISR")
+
+        self.stop_broker(self.kafka.nodes[isr[0] - 1], False)
+
+        self.kafka.start_node(self.kafka.nodes[isr[1] - 1], timeout_sec = 
self.broker_startup_timeout_sec)
+        self.kafka.start_node(self.kafka.nodes[isr[2] - 1], timeout_sec = 
self.broker_startup_timeout_sec)
+
+        output_messages_set = set(self.get_messages_from_topic(self.topic, 
self.num_seed_messages, group_protocol))
+        input_message_set = set(input_messages)
+
+        assert input_message_set == output_messages_set, \
+            "Input and concurrently consumed output message sets are not 
equal. Num input messages: %d. Num concurrently_consumed_messages: %d" % \
+            (len(input_message_set), len(output_messages_set))
\ No newline at end of file

Reply via email to