This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 1e1b9c598be KAFKA-19421: Deflake streams_broker_down_resilience_test
(#19999) (#21623)
1e1b9c598be is described below
commit 1e1b9c598be2446591ff012bdb76c26074d2dbd0
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Mar 3 19:56:52 2026 -0500
KAFKA-19421: Deflake streams_broker_down_resilience_test (#19999) (#21623)
`streams_broker_down_resilience_test` produce messages with `null` key
to a topic with three partitions and expect each partition to be
non-empty afterward. But I don't think this is a correct assumption, as
a producer may try to be sticky and only produce to two partitions.
This cause occasional flakiness in the test.
The fix is to produce records with keys.
Reviewers: Matthias J. Sax <[email protected]>, PoAn Yang
<[email protected]>
Co-authored-by: Lucas Brutschy <[email protected]>
---
tests/kafkatest/tests/streams/base_streams_test.py | 4 ++--
.../tests/streams/streams_broker_down_resilience_test.py | 16 ++++++++++++----
2 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py
b/tests/kafkatest/tests/streams/base_streams_test.py
index f0cd4ba89e6..ccb0e59dd0d 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -61,8 +61,8 @@ class BaseStreamsTest(KafkaTest):
self.assert_consume(client_id, test_state, streams_sink_topic,
num_messages, timeout_sec)
- def assert_produce(self, topic, test_state, num_messages=5,
timeout_sec=60):
- producer = self.get_producer(topic, num_messages)
+ def assert_produce(self, topic, test_state, num_messages=5,
timeout_sec=60, repeating_keys=None):
+ producer = self.get_producer(topic, num_messages,
repeating_keys=repeating_keys)
producer.start()
wait_until(lambda: producer.num_acked >= num_messages,
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index d0ecd083017..1dcbd77fe22 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -126,10 +126,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_broker_down_initially",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -181,10 +183,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_normal_broker_start",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -260,10 +264,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_normal_broker_start",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -307,10 +313,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,