Stanislav Kozlovski created KAFKA-8016: ------------------------------------------
Summary: Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group Key: KAFKA-8016 URL: https://issues.apache.org/jira/browse/KAFKA-8016 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski I have seen the following client exception after a consumer group rebalance: {code:java} INFO Fetcher Resetting offset for partition _ to offset 32110985. INFO Fetcher Resetting offset for partition _ to offset 32108462. java.lang.IllegalStateException: No current assignment for partition X at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} The logs also had this message in a close timeframe: {code:java} INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code} After investigating, I see that there might be a race condition:[ |https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247] [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2 |https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]This happens when the either `Consumer#position()` or `Consumer#poll()` gets called. The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException -- This message was sent by Atlassian JIRA (v7.6.3#76005)