This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 036ed569d51 KAFKA-19181-2: Increased offsets.commit.timeout.ms value 
as a temporary solution for the system test test_broker_failure failure (#19593)
036ed569d51 is described below

commit 036ed569d517fc2830b94fe9e543b595be26217e
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Apr 29 18:54:36 2025 +0530

    KAFKA-19181-2: Increased offsets.commit.timeout.ms value as a temporary 
solution for the system test test_broker_failure failure (#19593)
    
    Upon investigations for the failure of the system test
    test_broker_failure, it was found there were situations where the
    writing of records to the consumer_offsets topic was taking longer than
    5 seconds (default value of offsets.commit.timeout.ms). Since the
    persister requests of share partition initialization depends on the
    completion of the record committing, due to the timeout, there were no
    persister requests actually being sent. This PR increases the timeout
    for this config to 20 seconds, as a temporary solution. The fix for this
    is being tracked in the JIRA -
    https://issues.apache.org/jira/browse/KAFKA-19204
    
    Reviewers: Andrew Schofield <[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py                   | 7 ++++++-
 tests/kafkatest/services/kafka/templates/kafka.properties | 4 ++++
 tests/kafkatest/tests/client/share_consumer_test.py       | 8 +++++---
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index c59079a2aac..47023ff31c6 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -300,6 +300,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.use_share_groups = use_share_groups
         self.use_streams_groups = use_streams_groups
 
+        # Set offsets_commit_timeout based on context
+        if context.injected_args is not None:
+            self.offsets_commit_timeout = 
context.injected_args.get('offsets_commit_timeout')
+
         # Set consumer_group_migration_policy based on context and arguments.
         if consumer_group_migration_policy is None:
             arg_name = 'consumer_group_migration_policy'
@@ -753,7 +757,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         config_template = self.render('kafka.properties', node=node, 
broker_id=self.idx(node),
                                       security_config=self.security_config, 
num_nodes=self.num_nodes,
                                       
listener_security_config=self.listener_security_config,
-                                      use_share_groups=self.use_share_groups)
+                                      use_share_groups=self.use_share_groups,
+                                      
offsets_commit_timeout=self.offsets_commit_timeout)
 
         configs = dict( l.rstrip().split('=', 1) for l in 
config_template.split('\n')
                         if not l.startswith("#") and "=" in l )
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 861c63014c5..e0b85459590 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -130,4 +130,8 @@ group.initial.rebalance.delay.ms=100
 {% if use_share_groups is not none and use_share_groups %}
 share.coordinator.state.topic.replication.factor={{ 3 if num_nodes > 3 else 
num_nodes }}
 share.coordinator.state.topic.min.isr=1
+{% endif %}
+
+{% if offsets_commit_timeout is defined and offsets_commit_timeout is not none 
%}
+offsets.commit.timeout.ms={{ offsets_commit_timeout }}
 {% endif %}
\ No newline at end of file
diff --git a/tests/kafkatest/tests/client/share_consumer_test.py 
b/tests/kafkatest/tests/client/share_consumer_test.py
index f47ac1e771b..8367b1f4097 100644
--- a/tests/kafkatest/tests/client/share_consumer_test.py
+++ b/tests/kafkatest/tests/client/share_consumer_test.py
@@ -208,15 +208,17 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
         clean_shutdown=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
         num_failed_brokers=[1, 2],
-        use_share_groups=[True]
+        use_share_groups=[True],
+        offsets_commit_timeout=[20000]
     )
     @matrix(
         clean_shutdown=[True, False],
         metadata_quorum=[quorum.combined_kraft],
         num_failed_brokers=[1],
-        use_share_groups=[True]
+        use_share_groups=[True],
+        offsets_commit_timeout=[20000]
     )
-    def test_broker_failure(self, clean_shutdown, 
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, 
use_share_groups=True):
+    def test_broker_failure(self, clean_shutdown, 
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, 
use_share_groups=True, offsets_commit_timeout=20000):
 
         producer = self.setup_producer(self.TOPIC2["name"])
         consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")

Reply via email to