[ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
-----------------------------------
    Description: 
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
        at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
        at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
        at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)


We should review the poll expiration logic that triggers a leave group 
operation. That is currently applied in the HB Manager poll, without any 
validation, and given it depends on the consumer poll timer, it could happen at 
any time, regardless of the state of the member. Ex. poll timer could expire 
when the member is leaving, leading to this leaving->stale invalid transition. 
We should probably consider that this pro-active leave should only apply when 
the consumer is not leaving (prepare leaving or leaving)

  was:
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
        at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
        at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
        at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)


We should review the poll expiration logic that triggers a leave group 
operation. That is currently applied in the HB Manager poll, without any 
validation, and given it depends on the consumer poll timer, it could happen at 
any time, regardless of the state of the member. Ex. poll timer could expire 
when the member is leaving, leading to this leaving->stale invalid transition. 
We should probably consider that this pro-active leave should only apply when 
the consumer is in the group (not leaving, unsubscribed or fatal)


> Consumer invalid transition on expired poll interval
> ----------------------------------------------------
>
>                 Key: KAFKA-16165
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16165
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: clients, consumer
>    Affects Versions: 3.7.0
>            Reporter: Lianet Magrans
>            Assignee: Lianet Magrans
>            Priority: Blocker
>              Labels: client-transitions-issues, kip-848-client-support
>             Fix For: 3.8.0
>
>
> Running system tests with the new async consumer revealed an invalid 
> transition related to the consumer not being polled on the interval in some 
> kind of scenario (maybe relates to consumer close, as the transition is 
> leaving->stale)
> Log trace:
> [2024-01-17 19:45:07,379] WARN [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
> the time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2024-01-17 19:45:07,379] ERROR [Consumer 
> clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
> groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
> thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
> java.lang.IllegalStateException: Invalid state transition from LEAVING to 
> STALE
>       at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
>       at 
> org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
>       at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>       at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>       at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>       at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)
> We should review the poll expiration logic that triggers a leave group 
> operation. That is currently applied in the HB Manager poll, without any 
> validation, and given it depends on the consumer poll timer, it could happen 
> at any time, regardless of the state of the member. Ex. poll timer could 
> expire when the member is leaving, leading to this leaving->stale invalid 
> transition. We should probably consider that this pro-active leave should 
> only apply when the consumer is not leaving (prepare leaving or leaving)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to