[
https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai resolved KAFKA-17769.
------------------------------------
Resolution: Fixed
> Fix flaky
> PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-17769
> URL: https://issues.apache.org/jira/browse/KAFKA-17769
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Yu-Lin Chen
> Assignee: Yu-Lin Chen
> Priority: Major
> Labels: flaky-test, integration-test, kip-848-client-support
> Fix For: 4.0.0
>
>
> 4 flaky out of 110 trunk builds in past 2 weeks. ([Report
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
> This issue can be reproduced in my local within 50 loops.
>
> ([Oct 4 2024 at 10:35:49
> CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
> {code:java}
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
>
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
>
> at
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
>
> at
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
>
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
> at
> kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)
>
> at java.lang.reflect.Method.invoke(Method.java:566)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>
> at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
>
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId
> can't be empty. {code}
> {*}Root Cause{*}:
> The following hearbeat requests might happen in flight simultaneously in new
> consumer:
> - The first heartbeat, which will get memberId from group coordinator
> (memberEpoch = 0)
> - The next heartbeat after an unsubscribe event (memberEpoch = -1)
> The second heartbeat will be sent with empty memberId while the first
> heartbeat is still in flight. And the coordinator won't accept it (memberId =
> empty, memberEpoch = -1 ).
> This corner case occurs only when unsubscribe() is called shortly after the
> first poll.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)