[ https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stanislav Kozlovski resolved KAFKA-8016. ---------------------------------------- Resolution: Duplicate This is a symptom of https://issues.apache.org/jira/browse/KAFKA-7831 > Race condition resulting in IllegalStateException inside Consumer Heartbeat > thread when consumer joins group > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-8016 > URL: https://issues.apache.org/jira/browse/KAFKA-8016 > Project: Kafka > Issue Type: Improvement > Reporter: Stanislav Kozlovski > Assignee: Stanislav Kozlovski > Priority: Major > > I think the consumer heartbeat thread has a possibility for a race condition > that can crash it. > I have seen the following client exception after a consumer group rebalance: > {code:java} > INFO Fetcher Resetting offset for partition _ to offset 32110985. > INFO Fetcher Resetting offset for partition _ to offset 32108462. > java.lang.IllegalStateException: No current assignment for partition X > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) > {code} > The logs also had this message in a close timeframe: > {code:java} > INFO ConsumerCoordinator Revoking previously assigned partitions [X, > ...]{code} > > After investigating, I see that there might be a race condition: > > [Updating the fetch > positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] > in the client [involves sending a `ListOffsetsRequest` request to the > broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. > It is possible for the Heartbeat thread to initiate the code that handles > the response in its run > loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]) > > updateFetchPositions() is called from the public methods > `Consumer#position()` and `Consumer#poll()`. > The problem is that > [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] > may mutate the `subscriptions` variable while the offset response handling > by the heartbeat thread takes place. This results in `subscriptions.seek()` > throwing an IllegalStateException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)