Hello Murilo,

I think what you've discovered is a processing spike: i.e. each poll call
itself returning up to 1000 records, and then processing them may take
longer (if, say, they hit the p99 percentile latency due to the processing
logic etc). After your config change, it is less sensitive to such
processing latency pikes and hence would be less keen on dropping off the
group and hence triggering another rebalance.

After that, the "unstable assignment" message is a special rebalance
protocol such that when new tasks need to be migrated to a host which has
no state before, it would be first started as a "wamup" to bootstrap the
state and then migrate, but it's costs are that more rebalances will be
triggered; this is a normal scenario especially if you have just escaped
with a continuous rebalance storms that are due to rebalances and hence
cause a lot of states needed to be bootstrapped (and changing the configs
may exaggerate the situation actually), you can find more details about its
technical designs on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
and
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams.
For this, I'd suggest you to keep a relatively high value for
`restore.consumer.max.poll.records` while keep your current settings for
`main.consumer.max.poll.records` and see if that separation helps.


Guozhang

On Thu, Oct 14, 2021 at 9:05 AM Murilo Tavares <murilo...@gmail.com> wrote:

> Hi Guozhang
> Thanks for your response. I'm on KafkaStreams 2.8.1.
> Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker?
>
> But I found the issue.
> TL;DR: I increased max.poll.interval.ms and decreased max.poll.records
> which
> fixed the problem.
>
> I noticed the StreamThread summary logs
> Processed X total records, ran X punctuators, and committed X total tasks
> since the last update
> every 3 minutes and so. Looking at the code, I noticed they should be
> printed after a polling loop is completed as long as at least 2 min have
> passed since the last summary.
>
> Then I noticed that sometimes these messages would have a larger interval,
> in around 10 min or so. So I got the conclusion that polling was taking too
> long.
> So I increased max.poll.interval.ms and decreased max.poll.records (which
> we btw had on a non-default value of 1000, rather than 500).
> That improved the problem, as the polling loops would not timeout very
> often until it caught up.
> The weird thing I noticed is that, even after caught up and no more polling
> timeouts, we still had rebalances for an extra 2h, with messages saying:
> Finished unstable assignment of tasks
> For some reason, tasks seemed to keep migrating for 2h even after the
> polling stopped timing out and the message backlog had been processed.
> But now the service is stable again.
>
> Thanks
> Murilo
>
>
>
> On Wed, 13 Oct 2021 at 14:27, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Murilo, which version of Kafka Streams are you using? Could you try
> the
> > latest 3.0.0 release if it is not yet on that version?
> >
> > On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares <murilo...@gmail.com>
> > wrote:
> >
> > > Hi
> > > I have a large, stateful, KafkaStreams application that is on a never
> > > ending rebalance loop.
> > > I can see that Task restorations take a loooong time (circa 30-45 min).
> > And
> > > after that I see this error.
> > > This is followed by tasks being suspended, and the instance re-joining
> > the
> > > group and a new rebalance is triggered.
> > > Any ideas on how to fix this?
> > >
> > > WARN org.apache.kafka.streams.processor.internals.StreamThread -
> stream-
> > > thread [inventory-streams-green-0-StreamThread-1] Detected that the
> > thread
> > > is being fenced. This implies that this thread missed a rebalance and
> > > dropped out of the consumer group. Will close out all assigned tasks
> and
> > > rejoin the consumer group.
> > > org.apache.kafka.streams.errors.TaskMigratedException: Consumer
> > committing
> > > offsets failed, indicating the corresponding thread is no longer part
> of
> > > the group; it means all tasks belonging to this thread should be
> > migrated.
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > > (TaskManager.java:1141) ~[app.jar:?] at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(
> > > TaskManager.java:541) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked
> > > (StreamsRebalanceListener.java:95) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked
> > > (ConsumerCoordinator.java:312) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete
> > > (ConsumerCoordinator.java:408) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded
> > > (AbstractCoordinator.java:449) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup
> > > (AbstractCoordinator.java:365) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:508) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded
> > > (KafkaConsumer.java:1261) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230
> > > ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:1210) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> > > StreamThread.java:925) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(
> > > StreamThread.java:885) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > StreamThread.java:720) [app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:583) [app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.run(
> > > StreamThread.java:556) [app.jar:?] Caused by:
> > > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit
> > > cannot be completed since the consumer is not part of an active group
> for
> > > auto partition assignment; it is likely that the consumer was kicked
> out
> > of
> > > the group. at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest
> > > (ConsumerCoordinator.java:1139) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
> > > (ConsumerCoordinator.java:1004) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > KafkaConsumer.java:1490) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > KafkaConsumer.java:1438) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > > (TaskManager.java:1139) ~[app.jar:?] ... 15 more
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to