This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6826f45fd85 KAFKA-19352 Create offsets topic to fix flaky 
testCommitAsyncCompletedBeforeConsumerCloses (#19873)
6826f45fd85 is described below

commit 6826f45fd8588dcf16474fa0a9d6c38d195dc37a
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Mon Jun 2 01:01:24 2025 +0800

    KAFKA-19352 Create offsets topic to fix flaky 
testCommitAsyncCompletedBeforeConsumerCloses (#19873)
    
    The flakiness occurs when the offsets topic does not yet exist. Hence,
    the issue is mitigated by creating the offsets topic in `setup()`. This
    serves as a workaround.  The root cause is tracked in
    [KAFKA-19357](https://issues.apache.org/jira/browse/KAFKA-19357).
    
    I ran the test 100 times on my Mac and all of them passed.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
index c3f0aedccc6..162b7baa371 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -453,10 +453,14 @@ public class PlaintextConsumerCommitTest {
 
     // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
     @ClusterTest
-    public void testCommitAsyncCompletedBeforeConsumerCloses() {
+    public void testCommitAsyncCompletedBeforeConsumerCloses() throws 
InterruptedException {
         // This is testing the contract that asynchronous offset commit are 
completed before the consumer
         // is closed, even when no commit sync is performed as part of the 
close (due to auto-commit
         // disabled, or simply because there are no consumed offsets).
+
+        // Create offsets topic to ensure coordinator is available during close
+        cluster.createTopic(Topic.GROUP_METADATA_TOPIC_NAME, 
Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), 
Short.parseShort(OFFSETS_TOPIC_REPLICATION));
+
         try (Producer<byte[], byte[]> producer = 
cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all"));
              var consumer = createConsumer(GroupProtocol.CONSUMER, false)
         ) {

Reply via email to