Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
Ah. Well this isn't anything new then since it's been the case since 2.6, but the default task assignor in Kafka Streams will sometimes assign partitions unevenly for a time if it's trying to move around stateful tasks and there's no copy of that task's state on the local disk attached to the KafkaStreams instance it's trying to move that task to. This imbalance should only be temporary however, and it should converge on an even distribution of partitions over time as it finishes "warming up" the task state in the background and can finish moving those stateful tasks to their final destination. An upgrade can sometimes trigger a large-scale redistribution of tasks, which in turn can lead to a lot of these "warmup tasks" and a longer duration of uneven task assignment. But it should always level out eventually if the group is stable. So when you say "we've observed that the state store of Kafka Streams instances is not evenly distributed as it was before the upgrade" was this just referring to immediately after the upgrade? If so, give it some time and it should trend towards an even distribution. If it seems to be stuck in an uneven state, then that can either be because (a) there's a bug in the assignor, or more likely (b) the group itself is unstable and the assignment can't converge. Given this issue is accompanied by the "hostname unavailable", it sounds like the group is stuck rebalancing. Do you monitor rebalances in any way? If you're seeing them about every 10 minutes exactly, then it's most likely just the "probing rebalances" that happen while tasks are being warmed up via the process described above. But if the application is rebalancing repeatedly, nonstop, or over a very long period of time (many hours/days), then that would be a problem. So I guess my first question for you would be, has it settled down any since the upgrade? If you have very large state stores then the "warming up" can take a long time, even on the order of an hour or two. But definitely not days. There are some configs you can tweak if this is the case. Second question would be whether it's been rebalancing the whole time, or only every 10 minutes. If you don't monitor this already, there are a few ways to tell. One would be setting up a state listener via the KafkaStreams#setStateListener API, which has a REBALANCING state. Unfortunately this isn't always enough to go on since the REBALANCING state actually includes both literal rebalancing and also task restoration. It's still useful to know, especially when paired with a metric that helps differentiate between actual rebalancing vs task restoration. One such metric I personally always look at is the consumer's last-rebalance-seconds-ago, which basically represents how long it's been since a rebalance occurred. This metric can always instantly identify probing rebalances/warmup tasks by the sawtooth pattern with an amplitude of 10 min, corresponding to the regular 10 minute probing rebalances. Hope this helps, Sophie On Thu, May 9, 2024 at 9:20 PM Penumarthi Durga Prasad Chowdary < prasad.penumar...@gmail.com> wrote: > Kafka upgraded from 3.5.1 to 3.7.0 version > > On Fri, May 10, 2024 at 2:13 AM Sophie Blee-Goldman > > wrote: > > > What version did you upgrade from? > > > > On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary < > > prasad.penumar...@gmail.com> wrote: > > > > > Hi Team, > > > I'm utilizing Kafka Streams to handle data from Kafka topics, running > > > multiple instances with the same application ID. This enables > distributed > > > processing of Kafka data across these instances. Furthermore, I've > > > implemented state stores with time windows and session windows. To > > retrieve > > > windows efficiently, I've established a remote query mechanism between > > > Kafka Streams instances. By leveraging the queryMetadataForKey method > on > > > streams, I can retrieve the hostname where a specific key was processed > > and > > > where the corresponding window data resides in the state store. > > > *streams.queryMetadataForKey(storeName, recordKey, new > > > DataKeySerilizer()).activeHost();* > > > This functionality has been running smoothly for quite some time, up > > until > > > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the > > > upgrade, we've noticed some unexpected behavior that we didn't > encounter > > > with the previous versions. > > > > > >- The queryMetadataForKey method is returning "unavailable" as the > > >hostname, despite having two Kafka Streams instances in a running > > state. > > >The issue seems to persist intermittently, and restarting the Kafka > > > Streams > > >instances temporarily resolves it. However, the problem resurfaces > > after > > >some time. > > >- Additionally, we've observed that the state store of Kafka Streams > > >instances is not evenly distributed as it was before the upgrade. > > >Previously, if a Kafka topic had 10 partition
Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
Kafka upgraded from 3.5.1 to 3.7.0 version On Fri, May 10, 2024 at 2:13 AM Sophie Blee-Goldman wrote: > What version did you upgrade from? > > On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary < > prasad.penumar...@gmail.com> wrote: > > > Hi Team, > > I'm utilizing Kafka Streams to handle data from Kafka topics, running > > multiple instances with the same application ID. This enables distributed > > processing of Kafka data across these instances. Furthermore, I've > > implemented state stores with time windows and session windows. To > retrieve > > windows efficiently, I've established a remote query mechanism between > > Kafka Streams instances. By leveraging the queryMetadataForKey method on > > streams, I can retrieve the hostname where a specific key was processed > and > > where the corresponding window data resides in the state store. > > *streams.queryMetadataForKey(storeName, recordKey, new > > DataKeySerilizer()).activeHost();* > > This functionality has been running smoothly for quite some time, up > until > > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the > > upgrade, we've noticed some unexpected behavior that we didn't encounter > > with the previous versions. > > > >- The queryMetadataForKey method is returning "unavailable" as the > >hostname, despite having two Kafka Streams instances in a running > state. > >The issue seems to persist intermittently, and restarting the Kafka > > Streams > >instances temporarily resolves it. However, the problem resurfaces > after > >some time. > >- Additionally, we've observed that the state store of Kafka Streams > >instances is not evenly distributed as it was before the upgrade. > >Previously, if a Kafka topic had 10 partitions and two Kafka Streams > >instances were running, these 10 partitions would be evenly shared > > between > >the two instances. However, this behavior seems to have changed after > > the > >upgrade. > > > > When can this issue happen? > > How can I fix the issue ? > > I would like to express my gratitude in advance for any assistance > > provided. > > -- > > > > > > > > Thank's&Regard's, > > Prasad, > > 91-9030546248. > > > -- Thank's&Regard's, Prasad, 91-9030546248.
Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
What version did you upgrade from? On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary < prasad.penumar...@gmail.com> wrote: > Hi Team, > I'm utilizing Kafka Streams to handle data from Kafka topics, running > multiple instances with the same application ID. This enables distributed > processing of Kafka data across these instances. Furthermore, I've > implemented state stores with time windows and session windows. To retrieve > windows efficiently, I've established a remote query mechanism between > Kafka Streams instances. By leveraging the queryMetadataForKey method on > streams, I can retrieve the hostname where a specific key was processed and > where the corresponding window data resides in the state store. > *streams.queryMetadataForKey(storeName, recordKey, new > DataKeySerilizer()).activeHost();* > This functionality has been running smoothly for quite some time, up until > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the > upgrade, we've noticed some unexpected behavior that we didn't encounter > with the previous versions. > >- The queryMetadataForKey method is returning "unavailable" as the >hostname, despite having two Kafka Streams instances in a running state. >The issue seems to persist intermittently, and restarting the Kafka > Streams >instances temporarily resolves it. However, the problem resurfaces after >some time. >- Additionally, we've observed that the state store of Kafka Streams >instances is not evenly distributed as it was before the upgrade. >Previously, if a Kafka topic had 10 partitions and two Kafka Streams >instances were running, these 10 partitions would be evenly shared > between >the two instances. However, this behavior seems to have changed after > the >upgrade. > > When can this issue happen? > How can I fix the issue ? > I would like to express my gratitude in advance for any assistance > provided. > -- > > > > Thank's&Regard's, > Prasad, > 91-9030546248. >
Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
Hi Team, I'm utilizing Kafka Streams to handle data from Kafka topics, running multiple instances with the same application ID. This enables distributed processing of Kafka data across these instances. Furthermore, I've implemented state stores with time windows and session windows. To retrieve windows efficiently, I've established a remote query mechanism between Kafka Streams instances. By leveraging the queryMetadataForKey method on streams, I can retrieve the hostname where a specific key was processed and where the corresponding window data resides in the state store. *streams.queryMetadataForKey(storeName, recordKey, new DataKeySerilizer()).activeHost();* This functionality has been running smoothly for quite some time, up until we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the upgrade, we've noticed some unexpected behavior that we didn't encounter with the previous versions. - The queryMetadataForKey method is returning "unavailable" as the hostname, despite having two Kafka Streams instances in a running state. The issue seems to persist intermittently, and restarting the Kafka Streams instances temporarily resolves it. However, the problem resurfaces after some time. - Additionally, we've observed that the state store of Kafka Streams instances is not evenly distributed as it was before the upgrade. Previously, if a Kafka topic had 10 partitions and two Kafka Streams instances were running, these 10 partitions would be evenly shared between the two instances. However, this behavior seems to have changed after the upgrade. When can this issue happen? How can I fix the issue ? I would like to express my gratitude in advance for any assistance provided. -- Thank's&Regard's, Prasad, 91-9030546248.