Stanislav Kozlovski created KAFKA-8016:
------------------------------------------

             Summary: Race condition resulting in IllegalStateException inside 
Consumer Heartbeat thread when consumer joins group
                 Key: KAFKA-8016
                 URL: https://issues.apache.org/jira/browse/KAFKA-8016
             Project: Kafka
          Issue Type: Improvement
            Reporter: Stanislav Kozlovski
            Assignee: Stanislav Kozlovski


I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:[

|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]

[Updating the fetch 
positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213]
 in the client [involves sending a `ListOffsetsRequest` request to the 
broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603].
 It is possible for the Heartbeat thread to handle the response here: 
[1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2
|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]This
 happens when the either `Consumer#position()` or  `Consumer#poll()` gets 
called.
The problem is that 
[onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479]
 may mutate the `subscriptions` variable while the offset response handling by 
the heartbeat thread takes place. This results in `subscriptions.seek()` 
throwing an IllegalStateException



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to