Yu-Lin Chen created KAFKA-17769:
-----------------------------------
Summary: Fix flaky
PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
Key: KAFKA-17769
URL: https://issues.apache.org/jira/browse/KAFKA-17769
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Reporter: Yu-Lin Chen
Assignee: Yu-Lin Chen
4 flaky out of 110 trunk builds in past 2 weeks. ([Report
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
This issue can be reproduced in my local within 50 loops.
([Oct 4 2024 at 10:35:49
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer
at
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
at
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
at
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
at
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)
at java.lang.reflect.Method.invoke(Method.java:566)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at java.util.ArrayList.forEach(ArrayList.java:1541)
at java.util.ArrayList.forEach(ArrayList.java:1541)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId
can't be empty. {code}
{*}Root Cause{*}:
The following hearbeat requests might happen in flight simultaneously in new
consumer:
- The first heartbeat, which will get memberId from group coordinator
(memberEpoch = 0)
- The next heartbeat after an unsubscribe event (memberEpoch = -1)
If the first heartbeat still in flight. This is not accepted in coordinator.
The second heartbeat will be sent with empty memberId while the first heartbeat
is still in flight. And the coordinator won't accept it (memberId = empty,
memberEpoch = -1 ).
This corner case occurs only when unsubscribe() is called shortly after the
first poll()
--
This message was sent by Atlassian Jira
(v8.20.10#820010)