Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method

2024-05-13 Thread Sophie Blee-Goldman
Ah. Well this isn't anything new then since it's been the case since 2.6,
but the default task assignor in Kafka Streams will sometimes assign
partitions unevenly for a time if it's trying to move around stateful tasks
and there's no copy of that task's state on the local disk attached to the
KafkaStreams instance it's trying to move that task to. This imbalance
should only be temporary however, and it should converge on an even
distribution of partitions over time as it finishes "warming up" the task
state in the background and can finish moving those stateful tasks to their
final destination.

An upgrade can sometimes trigger a large-scale redistribution of tasks,
which in turn can lead to a lot of these "warmup tasks" and a longer
duration of uneven task assignment. But it should always level out
eventually if the group is stable. So when you say "we've observed that the
state store of Kafka Streams instances is not evenly distributed as it was
before the upgrade" was this just referring to immediately after the
upgrade? If so, give it some time and it should trend towards an even
distribution. If it seems to be stuck in an uneven state,  then that can
either be because (a) there's a bug in the assignor, or more likely (b) the
group itself is unstable and the assignment can't converge.

Given this issue is accompanied by the "hostname unavailable", it sounds
like the group is stuck rebalancing. Do you monitor rebalances in any way?
If you're seeing them about every 10 minutes exactly, then it's most likely
just the "probing rebalances" that happen while tasks are being warmed up
via the process described above. But if the application is rebalancing
repeatedly, nonstop, or over a very long period of time (many hours/days),
then that would be a problem.

So I guess my first question for you would be, has it settled down any
since the upgrade? If you have very large state stores then the "warming
up" can take a long time, even on the order of an hour or two. But
definitely not days. There are some configs you can tweak if this is the
case.

Second question would be whether it's been rebalancing the whole time, or
only every 10 minutes. If you don't monitor this already, there are a few
ways to tell. One would be setting up a state listener via the
KafkaStreams#setStateListener API, which has a REBALANCING state.
Unfortunately this isn't always enough to go on since the REBALANCING state
actually includes both literal rebalancing and also task restoration. It's
still useful to know, especially when paired with a metric that helps
differentiate between actual rebalancing vs task restoration. One such
metric I personally always look at is the consumer's
last-rebalance-seconds-ago, which basically represents how long it's been
since a rebalance occurred. This metric can always instantly identify
probing rebalances/warmup tasks by the sawtooth pattern with an amplitude
of 10 min, corresponding to the regular 10 minute probing rebalances.

Hope this helps,
Sophie

On Thu, May 9, 2024 at 9:20 PM Penumarthi Durga Prasad Chowdary <
prasad.penumar...@gmail.com> wrote:

> Kafka upgraded from 3.5.1 to 3.7.0 version
>
> On Fri, May 10, 2024 at 2:13 AM Sophie Blee-Goldman  >
> wrote:
>
> > What version did you upgrade from?
> >
> > On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary <
> > prasad.penumar...@gmail.com> wrote:
> >
> > > Hi Team,
> > >   I'm utilizing Kafka Streams to handle data from Kafka topics, running
> > > multiple instances with the same application ID. This enables
> distributed
> > > processing of Kafka data across these instances. Furthermore, I've
> > > implemented state stores with time windows and session windows. To
> > retrieve
> > > windows efficiently, I've established a remote query mechanism between
> > > Kafka Streams instances. By leveraging the queryMetadataForKey method
> on
> > > streams, I can retrieve the hostname where a specific key was processed
> > and
> > > where the corresponding window data resides in the state store.
> > >  *streams.queryMetadataForKey(storeName, recordKey, new
> > > DataKeySerilizer()).activeHost();*
> > > This functionality has been running smoothly for quite some time, up
> > until
> > > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the
> > > upgrade, we've noticed some unexpected behavior that we didn't
> encounter
> > > with the previous versions.
> > >
> > >- The queryMetadataForKey method is returning "unavailable" as the
> > >hostname, despite having two Kafka Streams instances in a running
> > state.
> > >The issue seems to persist intermittently, and restarting the Kafka
> > > Streams
> > >instances temporarily resolves it. However, the problem resurfaces
> > after
> > >some time.
> > >- Additionally, we've observed that the state store of Kafka Streams
> > >instances is not evenly distributed as it was before the upgrade.
> > >Previously, if a Kafka topic had 10 

Query regarding groupbykey in streams

2024-05-13 Thread Dev Lover
Hi All,

I have a custom partitioner to distribute the data across partitions in my
cluster.

My setup looks like below
Version - 3.7.0
Kafka - 3 broker setup
Partition count - 10
Stream server pods - 2
Stream threads in each pod - 10
Deployed in Kubernetes
Custom partitioner on producer end.

I am doing a groupbykey . Is it correct to use it when I have custom
partitioner on producer end ?
I recently migrated to 3.7 from 3.5.1 . I am observing that partitions are
not evenly distributed across my 2 stream pods. Also my remote query is
failing with host being unavailable. But if I restart streams it works fine
for a certain time and again starts erroring out. Am I doing something
wrong?

Regards


Re: Kafka Stream App Rolling Restarts - Too Many Rebalances Per Partition

2024-05-13 Thread Nagendra Mahesh (namahesh)
Thank you, Sophie, for your reply and for these recommendations - they are 
informative.
We are trying them out.

Thanks,
Nagendra U M

From: Sophie Blee-Goldman 
Sent: Tuesday, May 7, 2024 1:54 AM
To: users@kafka.apache.org 
Subject: Re: Kafka Stream App Rolling Restarts - Too Many Rebalances Per 
Partition

Hey,

Just skimming the config list, there are two things that immediately jumped
out at me:

1. The default session timeout was bumped up to 45 seconds a little while
ago. Not sure if you're overriding this or just using an older version, but
I definitely recommend bumping this up to 45s. Especially in combination
with...
2. The internal.leave.group.on.close should always be set to "false" by
Kafka Streams. Are you overriding this? If so, that definitely explains a
lot of the rebalances. This config is basically like an internal backdoor
used by Kafka Streams to do exactly what it sounds like you want to do --
avoid triggering a rebalance when closing the consumer/KafkaStreams. It
also works in combination with the session timeout, and basically means
"don't kick off an extra rebalance if a bounced consumer rejoins within the
session timeout".

I'd start with that and see how it goes before fiddling with other things,
like the probing.rebalance.interval and max.warmup.replicas, since that'll
have implications/tradeoffs you may not want.

Lastly: I know this is somewhat contrary to common sense, but with consumer
groups/Kafka Streams it can often be much better to bounce as many nodes as
you can at once, rather than doing a true rolling bounce. If for any reason
you can't bounce multiple nodes at once, at the very least you should make
sure they are bounced as quickly as possible, ie minimize the time between
when one node comes back up and the next one is bounced. Often people will
wait for each node to come online, rejoin the consumer group, and fully
stabilize before bouncing the next node. But that means every single bounce
will not just necesitate a rebalance, but also guarantees that partitions
will be shuffled around the entire time. So my main piece of advice
(besides fixing the two configs above) is: do the rolling restart as fast
as you can!

On Mon, May 6, 2024 at 7:02 AM Nagendra Mahesh (namahesh)
 wrote:

> Hi,
>
>
> We have multiple replicas of an application running on a kubernetes
> cluster. Each application instance runs a stateful kafka stream application
> with an in-memory state-store (backed by a changelog topic). All instances
> of the stream apps are members of the same consumer group.
>
>
> Deployments happen using the “rolling restart” method i.e. new replica(s)
> come up successfully, and existing (old) replica(s) are killed. Due to
> members joining the consumer group (new app instances) and members leaving
> the consumer group (old app instances), there is rebalancing of topic
> partitions within the group.
>
>
> Ultimately, when all instances of the app have completed rolling restart,
> we see partitions have undergone rebalancing an excessive number of times.
> For example, the app has 48 instances and it is observed that each
> partition (say, partition #50) has undergone rebalancing a lot of times (50
> - 57 times) by moving across several app instances. Total count of
> partition movements during the entire rolling restart is greater than 3000.
>
>
> This excessive rebalancing incurs an overall lag on message processing
> SLAs, and is creating reliability issues.
>
>
> So, we are wondering:
>
>
> (1) is this expected, especially since cooperative rebalancing should
> ensure that not a lot of partitions get rebalanced
>
>
> (2) why would any partition undergo so many rebalances across several app
> instances?
>
>
> (3) is there some configuration (broker config or client config) that we
> can apply to reduce the total rebalances and partition movements during
> rolling restarts? We cannot consider static membership due to other
> technical constraints.
>
>
> The runtime and network is extremely stable — no heartbeat misses, session
> timeouts etc.
>
>
> DETAILS
>
> ---
>
>   *   Kafka Broker Version = 2.6
>
>   *   Kafka Streams Client Version = 2.7.0
>
>   *   No. of app instances = 48
>
>   *   No. of stream threads per stream app = 3
>
>   *   Total partition count = 60
>
>   *   Warmup Replicas (max.warmup.replicas) = 5
>
>   *   Standby Replicas (num.standby.replicas) = 2
>
>   *   probing.rebalance.interval.ms) = 30 (5 minutes)
>
>   *   session.timeout.ms = 1 (10 seconds)
>
>   *   heartbeat.interval.ms = 3000 (3 seconds)
>
>   *   internal.leave.group.on.close = true
>
>   *   linger.ms = 5
>
>   *   processing.guarantee = at_least_once
>
>
> Any help or information would be greatly appreciated.
>
> Thanks,
> Nagendra U M
>