Hey Tao,

We recently discovered a bug in the way that the consumer tracks partition
metadata which may cause the cooperative-sticky assignor to throw this
exception in the case of a consumer that dropped out of the group at some
point. I'm just about to file a ticket for it, and it should be fixed in
the upcoming releases.

The problem is that some consumers are claiming to own partitions that they
no longer actually own after having dropped out. If you can narrow down the
problematic consumers and restart them, it should resolve the issue. I
believe you should be able to tell which consumers are claiming partitions
they no longer own based on the logs, but another option is just to restart
all the consumers (or do a rolling restart until the problem goes away).

I'll follow up here with the ticket link once I've filed it.

-Sophie

On Tue, Jun 22, 2021 at 12:07 PM Tao Huang <sandy.huang...@gmail.com> wrote:

> Thanks for the feedback.
>
> It seems the referred bug is on the server (Broker) side? I just checked
> my Kafka Broker version, it is actually on 2.4.1. So the bug seems does not
> apply to my case.
>
> Should I downgrade my client (Java library) version to 2.4.1?
>
> Thanks!
>
> On 2021/06/21 20:04:31, Ran Lupovich <ranlupov...@gmail.com> wrote:
> > https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-12890
> >
> > Check out this jira ticket
> >
> > בתאריך יום ב׳, 21 ביוני 2021, 22:15, מאת Tao Huang ‏<
> > sandy.huang...@gmail.com>:
> >
> > > Hi There,
> > >
> > > I am experiencing intermittent issue when consumer group stuck on
> > > "Completing-Reblalance" state. When this is happening, client throws
> error
> > > as below:
> > >
> > > 2021-06-18 13:55:41,086 ERROR io.mylab.adapter.KafkaListener
> > > [edfKafkaListener:CIO.PandC.CIPG.InternalLoggingMetadataInfo]
> Exception on
> > > Kafka listener (InternalLoggingMetadataInfo) - Some partitions are
> > > unassigned but all consumers are at maximum capacity
> > > java.lang.IllegalStateException: Some partitions are unassigned but all
> > > consumers are at maximum capacity
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.constrainedAssign(AbstractStickyAssignor.java:248)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:81)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:66)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:686)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1151)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1126)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
> > > at
> > >
> > >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
> > > at io.mylab.adapter.KafkaListener.run(EdfKafkaListener.java:93)
> > > at java.lang.Thread.run(Thread.java:748)
> > >
> > > The option to exit the state is to stop some members of the consumer
> group.
> > >
> > > Version: 2.6.1
> > > PARTITION_ASSIGNMENT_STRATEGY: CooperativeStickyAssignor
> > >
> > > Would you please advise what would be the condition to trigger such
> issue?
> > >
> > > Thanks!
> > >
> >
>

Reply via email to