[ 
https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Lin Chen updated KAFKA-17769:
--------------------------------
    Description: 
4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])

This issue can be reproduced in my local within 50 loops.
 
([Oct 4 2024 at 10:35:49 
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer  

at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
   
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
   
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)  
     
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
       
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
       
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)     
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)    
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)        
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)  
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)  
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)

Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
can't be empty. {code}
{*}Root Cause{*}: 
The following hearbeat requests might happen in flight simultaneously in new 
consumer:
 - The first heartbeat, which will get memberId from group coordinator 
(memberEpoch = 0)
 - The next heartbeat after an unsubscribe event (memberEpoch = -1)

The second heartbeat will be sent with empty memberId while the first heartbeat 
is still in flight. And the coordinator won't accept it (memberId = empty, 
memberEpoch = -1 ). 

This corner case occurs only when unsubscribe() is called shortly after the 
first poll.

  was:
4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])

This issue can be reproduced in my local within 50 loops.
 
([Oct 4 2024 at 10:35:49 
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer  

at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
   
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
   
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)  
     
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
       
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
       
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)     
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)    
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)        
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)  
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)  
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)

Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
can't be empty. {code}
{*}Root Cause{*}: 
The following hearbeat requests might happen in flight simultaneously in new 
consumer:
 - The first heartbeat, which will get memberId from group coordinator 
(memberEpoch = 0)
 - The next heartbeat after an unsubscribe event (memberEpoch = -1)

The second heartbeat will be sent with empty memberId while the first heartbeat 
is still in flight. And the coordinator won't accept it (memberId = empty, 
memberEpoch = -1 ). 

This corner case occurs only when unsubscribe() is called shortly after the 
first poll()


> Fix flaky 
> PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17769
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Yu-Lin Chen
>            Assignee: Yu-Lin Chen
>            Priority: Major
>
> 4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
> This issue can be reproduced in my local within 50 loops.
>  
> ([Oct 4 2024 at 10:35:49 
> CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
> {code:java}
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer        
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
>      
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)   
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)  
> at scala.collection.AbstractIterable.foreach(Iterable.scala:935)      
> at 
> kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)   
>      
> at java.lang.reflect.Method.invoke(Method.java:566)   
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)   
>      
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.ArrayList.forEach(ArrayList.java:1541)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
> can't be empty. {code}
> {*}Root Cause{*}: 
> The following hearbeat requests might happen in flight simultaneously in new 
> consumer:
>  - The first heartbeat, which will get memberId from group coordinator 
> (memberEpoch = 0)
>  - The next heartbeat after an unsubscribe event (memberEpoch = -1)
> The second heartbeat will be sent with empty memberId while the first 
> heartbeat is still in flight. And the coordinator won't accept it (memberId = 
> empty, memberEpoch = -1 ). 
> This corner case occurs only when unsubscribe() is called shortly after the 
> first poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to