This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b41c4f6b57d7535166f958ac118717f2d97bf616
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 23 10:30:13 2025 +0200

    KAFKA-19421: Deflake streams_broker_down_resilience_test (#19999)
    
    `streams_broker_down_resilience_test` produce messages with `null` key
    to a topic with three partitions and expect each partition to be
    non-empty afterward. But I don't think this is a correct assumption, as
    a producer may try to be sticky and only produce to two partitions.
    
    This cause occasional flakiness in the test.
    
    The fix is to produce records with keys.
    
    Reviewers: Matthias J. Sax <[email protected]>, PoAn Yang
     <[email protected]>
---
 tests/kafkatest/tests/streams/base_streams_test.py       |  4 ++--
 .../tests/streams/streams_broker_down_resilience_test.py | 16 ++++++++++++----
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/tests/kafkatest/tests/streams/base_streams_test.py 
b/tests/kafkatest/tests/streams/base_streams_test.py
index 81cad7a4d1b..da00e0895f2 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -82,8 +82,8 @@ class BaseStreamsTest(Test):
 
         self.assert_consume(client_id, test_state, streams_sink_topic, 
num_messages, timeout_sec)
 
-    def assert_produce(self, topic, test_state, num_messages=5, 
timeout_sec=60):
-        producer = self.get_producer(topic, num_messages)
+    def assert_produce(self, topic, test_state, num_messages=5, 
timeout_sec=60, repeating_keys=None):
+        producer = self.get_producer(topic, num_messages, 
repeating_keys=repeating_keys)
         producer.start()
 
         wait_until(lambda: producer.num_acked >= num_messages,
diff --git 
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py 
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 3b9d0b43bf7..94df6e37473 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -129,10 +129,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) 
as monitor_2:
                 with 
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
 
+                    # repeating_keys enables production of records with keys, 
ensuring that we produce to all 3 partitions
                     self.assert_produce(self.inputTopic,
                                         
"sending_message_after_broker_down_initially",
                                         num_messages=self.num_messages,
-                                        timeout_sec=120)
+                                        timeout_sec=120,
+                                        repeating_keys=self.num_messages)
 
                     monitor_1.wait_until(self.message,
                                          timeout_sec=120,
@@ -189,10 +191,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) 
as monitor_2:
                 with 
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
 
+                    # repeating_keys enables production of records with keys, 
ensuring that we produce to all 3 partitions
                     self.assert_produce(self.inputTopic,
                                         "sending_message_normal_broker_start",
                                         num_messages=self.num_messages,
-                                        timeout_sec=120)
+                                        timeout_sec=120,
+                                        repeating_keys=self.num_messages)
 
                     monitor_1.wait_until(self.message,
                                          timeout_sec=120,
@@ -273,10 +277,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) 
as monitor_2:
                 with 
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
 
+                    # repeating_keys enables production of records with keys, 
ensuring that we produce to all 3 partitions
                     self.assert_produce(self.inputTopic,
                                         
"sending_message_after_normal_broker_start",
                                         num_messages=self.num_messages,
-                                        timeout_sec=120)
+                                        timeout_sec=120,
+                                        repeating_keys=self.num_messages)
 
                     monitor_1.wait_until(self.message,
                                          timeout_sec=120,
@@ -320,10 +326,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) 
as monitor_2:
                 with 
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
 
+                    # repeating_keys enables production of records with keys, 
ensuring that we produce to all 3 partitions
                     self.assert_produce(self.inputTopic,
                                         
"sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
                                         num_messages=self.num_messages,
-                                        timeout_sec=120)
+                                        timeout_sec=120,
+                                        repeating_keys=self.num_messages)
 
                     monitor_1.wait_until(self.message,
                                          timeout_sec=120,

Reply via email to