This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b41c4f6b57d7535166f958ac118717f2d97bf616 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Jun 23 10:30:13 2025 +0200 KAFKA-19421: Deflake streams_broker_down_resilience_test (#19999) `streams_broker_down_resilience_test` produce messages with `null` key to a topic with three partitions and expect each partition to be non-empty afterward. But I don't think this is a correct assumption, as a producer may try to be sticky and only produce to two partitions. This cause occasional flakiness in the test. The fix is to produce records with keys. Reviewers: Matthias J. Sax <[email protected]>, PoAn Yang <[email protected]> --- tests/kafkatest/tests/streams/base_streams_test.py | 4 ++-- .../tests/streams/streams_broker_down_resilience_test.py | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 81cad7a4d1b..da00e0895f2 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -82,8 +82,8 @@ class BaseStreamsTest(Test): self.assert_consume(client_id, test_state, streams_sink_topic, num_messages, timeout_sec) - def assert_produce(self, topic, test_state, num_messages=5, timeout_sec=60): - producer = self.get_producer(topic, num_messages) + def assert_produce(self, topic, test_state, num_messages=5, timeout_sec=60, repeating_keys=None): + producer = self.get_producer(topic, num_messages, repeating_keys=repeating_keys) producer.start() wait_until(lambda: producer.num_acked >= num_messages, diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 3b9d0b43bf7..94df6e37473 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -129,10 +129,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest): with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_broker_down_initially", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -189,10 +191,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest): with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_normal_broker_start", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -273,10 +277,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest): with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_normal_broker_start", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -320,10 +326,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest): with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_hard_bouncing_streams_instance_bouncing_broker", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120,
