This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 036ed569d51 KAFKA-19181-2: Increased offsets.commit.timeout.ms value
as a temporary solution for the system test test_broker_failure failure (#19593)
036ed569d51 is described below
commit 036ed569d517fc2830b94fe9e543b595be26217e
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Apr 29 18:54:36 2025 +0530
KAFKA-19181-2: Increased offsets.commit.timeout.ms value as a temporary
solution for the system test test_broker_failure failure (#19593)
Upon investigations for the failure of the system test
test_broker_failure, it was found there were situations where the
writing of records to the consumer_offsets topic was taking longer than
5 seconds (default value of offsets.commit.timeout.ms). Since the
persister requests of share partition initialization depends on the
completion of the record committing, due to the timeout, there were no
persister requests actually being sent. This PR increases the timeout
for this config to 20 seconds, as a temporary solution. The fix for this
is being tracked in the JIRA -
https://issues.apache.org/jira/browse/KAFKA-19204
Reviewers: Andrew Schofield <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 7 ++++++-
tests/kafkatest/services/kafka/templates/kafka.properties | 4 ++++
tests/kafkatest/tests/client/share_consumer_test.py | 8 +++++---
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index c59079a2aac..47023ff31c6 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -300,6 +300,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.use_share_groups = use_share_groups
self.use_streams_groups = use_streams_groups
+ # Set offsets_commit_timeout based on context
+ if context.injected_args is not None:
+ self.offsets_commit_timeout =
context.injected_args.get('offsets_commit_timeout')
+
# Set consumer_group_migration_policy based on context and arguments.
if consumer_group_migration_policy is None:
arg_name = 'consumer_group_migration_policy'
@@ -753,7 +757,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
config_template = self.render('kafka.properties', node=node,
broker_id=self.idx(node),
security_config=self.security_config,
num_nodes=self.num_nodes,
listener_security_config=self.listener_security_config,
- use_share_groups=self.use_share_groups)
+ use_share_groups=self.use_share_groups,
+
offsets_commit_timeout=self.offsets_commit_timeout)
configs = dict( l.rstrip().split('=', 1) for l in
config_template.split('\n')
if not l.startswith("#") and "=" in l )
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 861c63014c5..e0b85459590 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -130,4 +130,8 @@ group.initial.rebalance.delay.ms=100
{% if use_share_groups is not none and use_share_groups %}
share.coordinator.state.topic.replication.factor={{ 3 if num_nodes > 3 else
num_nodes }}
share.coordinator.state.topic.min.isr=1
+{% endif %}
+
+{% if offsets_commit_timeout is defined and offsets_commit_timeout is not none
%}
+offsets.commit.timeout.ms={{ offsets_commit_timeout }}
{% endif %}
\ No newline at end of file
diff --git a/tests/kafkatest/tests/client/share_consumer_test.py
b/tests/kafkatest/tests/client/share_consumer_test.py
index f47ac1e771b..8367b1f4097 100644
--- a/tests/kafkatest/tests/client/share_consumer_test.py
+++ b/tests/kafkatest/tests/client/share_consumer_test.py
@@ -208,15 +208,17 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
clean_shutdown=[True, False],
metadata_quorum=[quorum.isolated_kraft],
num_failed_brokers=[1, 2],
- use_share_groups=[True]
+ use_share_groups=[True],
+ offsets_commit_timeout=[20000]
)
@matrix(
clean_shutdown=[True, False],
metadata_quorum=[quorum.combined_kraft],
num_failed_brokers=[1],
- use_share_groups=[True]
+ use_share_groups=[True],
+ offsets_commit_timeout=[20000]
)
- def test_broker_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1,
use_share_groups=True):
+ def test_broker_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1,
use_share_groups=True, offsets_commit_timeout=20000):
producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"],
offset_reset_strategy="earliest")