Apologies for the delay, Bruno. Thank you so much for the excellent link and 
for your inputs! Also, I would like to thank Matthias and yourself for the 
guidance on the stalling issue in the Kafka Streams client. After restoring the 
default value for the  METADATA_MAX_AGE_CONFIG, I haven’t seen the issue 
happening. Heavy rebalancing (as mentioned before) continues to happen. I will 
refer to the link which mentions about certain metrics which can give insights.

Thank you very much.

Kind regards,
Venkatesh

From: Bruno Cadonna <cado...@apache.org>
Date: Friday, 22 March 2024 at 9:53 PM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Hi Venkatesh,

The 1 core 1 stream thread recommendation is just s starting point. You
need to set the number of stream thread as it fits you by monitoring the
app.

Maybe this blog post might be interesting for you:
https://www.responsive.dev/blog/a-size-for-every-stream<https://www.responsive.dev/blog/a-size-for-every-stream>

Best,
Bruno


On 3/19/24 4:14 AM, Venkatesh Nagarajan wrote:
> Thanks very much for sharing the links and for your important inputs, Bruno!
>
>> We recommend to use as many stream threads as cores on the compute node 
>> where the Kafka Streams client is run. How many Kafka Streams tasks do you 
>> have to distribute over the clients?
>
> We use 1vCPU (probably 1 core) per Kafka Streams Client (ECS Task). Each 
> client/ECS Task runs 10 streaming threads and the CPU utilisation is just 4% 
> on an average. It increases when transient errors occur as they require 
> retries and threads to be replaced.
>
> We run a maximum of 6 clients/ECS Tasks when the offset lags are high. The 
> input topics have 60 partitions each and this matches (total number of 
> clients/ECS Tasks i.e. 6) * ( Streaming threads per client/ECS task i.e.10).
>
> With the 1 streaming thread per core approach, we will need 60 vCPUs/cores. 
> As I mentioned above, we have observed 10 threads using just 4% of 1 
> vCPU/core on an average. It may be difficult to justify provisioning more 
> cores as it will be expensive and because Kafka Streams recovers from 
> failures in acquiring locks.
>
> Please feel free to correct me and/or share your thoughts.
>
> Thank you.
>
> Kind regards,
> Venkatesh
>
> From: Bruno Cadonna <cado...@apache.org>
> Date: Friday, 15 March 2024 at 8:47 PM
> To: users@kafka.apache.org <users@kafka.apache.org>
> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
> Hi Venkatesh,
>
> As you discovered, in Kafka Streams 3.5.1 there is no stop-the-world
> rebalancing.
>
> Static group member is helpful when Kafka Streams clients are restarted
> as you pointed out.
>
>> ERROR org.apache.kafka.streams.processor.internals.StandbyTask -
> stream-thread [<member>-StreamThread-1] standby-task [1_32] Failed to
> acquire lock while closing the state store for STANDBY task
>
> This error (and some others about lock acquisition) happens when a
> stream thread wants to lock the state directory for a task but the
> stream thread on the same Kafka Streams client has not releases the lock
> yet. And yes, Kafka Streams handles them.
>
> 30 and 60 stream threads is a lot for one Kafka Streams client. We
> recommend to use as many stream threads as cores on the compute node
> where the Kafka Streams client is run. How many Kafka Streams tasks do
> you have to distribute over the clients?
>
>> Would you consider this level of rebalancing to be normal?
>
> The rate of rebalance events seems high indeed. However, the log
> messages you posted in one of your last e-mails are normal during a
> rebalance and they have nothing to do with METADATA_MAX_AGE_CONFIG.
>
> I do not know the metric SumOffsetLag. Judging from a quick search on
> the internet, I think it is a MSK specific metric.
> https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works<https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works><https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works<https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works>>
> Under the link you can also find some other metrics that you can use.
>
> The following talk might help you debugging your rebalance issues:
>
> https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story/<https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story><https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story<https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story>>
>
>
> Best,
> Bruno
>
> On 3/14/24 11:11 PM, Venkatesh Nagarajan wrote:
>> Just want to make a correction, Bruno - My understanding is that Kafka 
>> Streams 3.5.1 uses Incremental Cooperative Rebalancing which seems to help 
>> reduce the impact of rebalancing caused by autoscaling etc.:
>>
>> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/<https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka><https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka<https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka>>
>>
>> Static group membership may also have a role to play especially if the ECS 
>> tasks get restarted for some reason.
>>
>>
>> I also want to mention to you about this error which occurred 759 times 
>> during the 13 hour load test:
>>
>> ERROR org.apache.kafka.streams.processor.internals.StandbyTask - 
>> stream-thread [<member>-StreamThread-1] standby-task [1_32] Failed to 
>> acquire lock while closing the state store for STANDBY task
>>
>> I think Kafka Streams automatically recovers from this. Also, I have seen 
>> this error to increase when the number of streaming threads is high (30 or 
>> 60 threads). So I use just 10 threads per ECS task.
>>
>> Kind regards,
>> Venkatesh
>>
>> From: Venkatesh Nagarajan <venkatesh.nagara...@uts.edu.au>
>> Date: Friday, 15 March 2024 at 8:30 AM
>> To: users@kafka.apache.org <users@kafka.apache.org>
>> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get 
>> stalled
>> Apologies for the delay in responding to you, Bruno. Thank you very much for 
>> your important inputs.
>>
>> Just searched for log messages in the MSK broker logs pertaining to 
>> rebalancing and updating of metadata for the consumer group and found 412 
>> occurrences in a 13 hour period. During this time, a load test was run and 
>> around 270k events were processed. Would you consider this level of 
>> rebalancing to be normal?
>>
>> Also, I need to mention that when offset lags increase, autoscaling creates 
>> additional ECS tasks to help with faster processing. A lot of rebalancing 
>> happens for a few hours before the consumer group becomes stable.
>>
>> By stop-the-world rebalancing, I meant a rebalancing that would cause the 
>> processing to completely stop when it happens. To avoid this, we use static 
>> group membership as explained by Matthias in this presentation:
>>
>> https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid/<https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid><https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid<https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid>>
>>
>> Static group membership seems to help reduce the impact of the rebalancing 
>> caused by scaling out of consumers.
>>
>> On a separate note, when rebalancing happens, we lose the SumOffsetLag 
>> metric emitted by MSK for the consumer group. The AWS Support team said that 
>> the metric will only be available when the consumer group is stable or 
>> empty. I am not sure if this metric is specific to MSK or if it is related 
>> to Apache Kafka. If there is another metric I can use which can make offset 
>> lags observable even during rebalancing, can you please let me know?
>>
>> Thank you very much.
>>
>> Kind regards,
>> Venkatesh
>>
>> From: Bruno Cadonna <cado...@apache.org>
>> Date: Wednesday, 13 March 2024 at 8:29 PM
>> To: users@kafka.apache.org <users@kafka.apache.org>
>> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get 
>> stalled
>> Hi Venkatesh,
>>
>> Extending on what Matthias replied, a metadata refresh might trigger a
>> rebalance if the metadata changed. However, a metadata refresh that does
>> not show a change in the metadata will not trigger a rebalance. In this
>> context, i.e., config METADATA_MAX_AGE_CONFIG, the metadata is the
>> metadata about the cluster received by the client.
>>
>> The metadata mentioned in the log messages you posted is metadata of the
>> group to which the member (a.k.a. consumer, a.k.a. client) belongs. The
>> log message originates from the broker (in contrast
>> METADATA_MAX_AGE_CONFIG is a client config). If the rebalance were
>> triggered by a cluster metadata change the log message should contain
>> something like "cached metadata has changed" as client reason [1].
>>
>> Your log messages seem genuine log messages that are completely normal
>> during rebalance events.
>>
>> How often do they happen?
>> What do you mean with stop-the-world rebalances?
>>
>> Best,
>> Bruno
>>
>>
>> [1]
>> https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>>>
>>
>>
>> On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:
>>> Just want to share another variant of the log message which is also related 
>>> to metadata and rebalancing but has a different client reason:
>>>
>>> INFO [GroupCoordinator 3]: Preparing to rebalance group <group> in state 
>>> PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: 
>>> Updating metadata for member <member> during Stable; client reason: 
>>> triggered followup rebalance scheduled for 0) 
>>> (kafka.coordinator.group.GroupCoordinator)
>>>
>>> Thank you.
>>>
>>> Kind regards,
>>> Venkatesh
>>>
>>> From: Venkatesh Nagarajan <venkatesh.nagara...@uts.edu.au>
>>> Date: Wednesday, 13 March 2024 at 12:06 pm
>>> To: users@kafka.apache.org <users@kafka.apache.org>
>>> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get 
>>> stalled
>>> Thanks very much for your important inputs, Matthias.
>>>
>>> I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I 
>>> saw a lot of such rebalancing related messages in the MSK broker logs:
>>>
>>> INFO [GroupCoordinator 2]: Preparing to rebalance group <group> in state 
>>> PreparingRebalance with old generation nnnn (__consumer_offsets-nn) 
>>> (reason: Updating metadata for member <member> during Stable; client 
>>> reason: need to revoke partitions and re-join) 
>>> (kafka.coordinator.group.GroupCoordinator)
>>>
>>> I am guessing that the two are unrelated. If you have any suggestions on 
>>> how to reduce such rebalancing, that will be very helpful.
>>>
>>> Thank you very much.
>>>
>>> Kind regards,
>>> Venkatesh
>>>
>>> From: Matthias J. Sax <mj...@apache.org>
>>> Date: Tuesday, 12 March 2024 at 1:31 pm
>>> To: users@kafka.apache.org <users@kafka.apache.org>
>>> Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
>>> Without detailed logs (maybe even DEBUG) hard to say.
>>>
>>> But from what you describe, it could be a metadata issue? Why are you
>>> setting
>>>
>>>> METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
>>>> make rebalances rare)
>>>
>>> Refreshing metadata has nothing to do with rebalances, and a metadata
>>> refresh does not trigger a rebalance.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:
>>>> Hi all,
>>>>
>>>> A Kafka Streams application sometimes stops consuming events during load 
>>>> testing. Please find below the details:
>>>>
>>>> Details of the app:
>>>>
>>>>
>>>> * Kafka Streams Version: 3.5.1
>>>> * Kafka: AWS MSK v3.6.0
>>>> * Consumes events from 6 topics
>>>> * Calls APIs to enrich events
>>>> * Sometimes joins two streams
>>>> * Produces enriched events in output topics
>>>>
>>>> Runs on AWS ECS:
>>>>
>>>> * Each task has 10 streaming threads
>>>> * Autoscaling based on offset lags and a maximum of 6 ECS tasks
>>>> * Input topics have 60 partitions each to match 6 tasks * 10 threads
>>>> * Fairly good spread of events across all topic partitions using 
>>>> partitioning keys
>>>>
>>>> Settings and configuration:
>>>>
>>>>
>>>> * At least once semantics
>>>> * MAX_POLL_RECORDS_CONFIG: 10
>>>> * APPLICATION_ID_CONFIG
>>>>
>>>> // Make rebalances rare and prevent stop-the-world rebalances
>>>>
>>>> * Static membership (using GROUP_INSTANCE_ID_CONFIG)
>>>> * METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
>>>> make rebalances rare)
>>>> * MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
>>>> * SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis
>>>>
>>>> State store related settings:
>>>>
>>>> * TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
>>>> * STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
>>>> * NUM_STANDBY_REPLICAS_CONFIG: 1
>>>>
>>>>
>>>> Symptoms:
>>>> The symptoms mentioned below occur during load tests:
>>>>
>>>> Scenario# 1:
>>>> Steady input event stream
>>>>
>>>> Observations:
>>>>
>>>> * Gradually increasing offset lags which shouldn't happen normally as the 
>>>> streaming app is quite fast
>>>> * Events get processed
>>>>
>>>> Scenario# 2:
>>>> No input events after the load test stops producing events
>>>>
>>>> Observations:
>>>>
>>>> * Offset lag stuck at ~5k
>>>> * Stable consumer group
>>>> * No events processed
>>>> * No errors or messages in the logs
>>>>
>>>>
>>>> Scenario# 3:
>>>> Restart the app when it stops processing events although offset lags are 
>>>> not zero
>>>>
>>>> Observations:
>>>>
>>>> * Offset lags start reducing and events start getting processed
>>>>
>>>> Scenario# 4:
>>>> Transient errors occur while processing events
>>>>
>>>>
>>>> * A custom exception handler that implements 
>>>> StreamsUncaughtExceptionHandler returns 
>>>> StreamThreadExceptionResponse.REPLACE_THREAD in the handle method
>>>> * If transient errors keep occurring occasionally and threads get 
>>>> replaced, the problem of the app stalling disappears.
>>>> * But if transient errors don't occur, the app tends to stall and I need 
>>>> to manually restart it
>>>>
>>>>
>>>> Summary:
>>>>
>>>> * It appears that some streaming threads stall after processing for a 
>>>> while.
>>>> * It is difficult to change log level for Kafka Streams from ERROR to INFO 
>>>> as it starts producing a lot of log messages especially during load tests.
>>>> * I haven't yet managed to push Kafka streams metrics into AWS OTEL 
>>>> collector to get more insights.
>>>>
>>>> Can you please let me know if any Kafka Streams config settings need 
>>>> changing? Should I reduce the values of any of these settings to help 
>>>> trigger rebalancing early and hence assign partitions to members that are 
>>>> active:
>>>>
>>>>
>>>> * METADATA_MAX_AGE_CONFIG: 5 hours in millis (to make rebalances rare)
>>>> * MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
>>>> * SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis
>>>>
>>>> Should I get rid of static membership – this may increase rebalancing but 
>>>> may be okay if it can prevent stalled threads from appearing as active 
>>>> members
>>>>
>>>> Should I try upgrading Kafka Streams to v3.6.0 or v3.7.0? Hoping that 
>>>> v3.7.0 will be compatible with AWS MSK v3.6.0.
>>>>
>>>>
>>>> Thank you very much.
>>>>
>>>> Kind regards,
>>>> Venkatesh
>>>>
>>>> UTS CRICOS Provider Code: 00099F DISCLAIMER: This email message and any 
>>>> accompanying attachments may contain confidential information. If you are 
>>>> not the intended recipient, do not read, use, disseminate, distribute or 
>>>> copy this message or attachments. If you have received this message in 
>>>> error, please notify the sender immediately and delete this message. Any 
>>>> views expressed in this message are those of the individual sender, except 
>>>> where the sender expressly, and with authority, states them to be the 
>>>> views of the University of Technology Sydney. Before opening any 
>>>> attachments, please check them for viruses and defects. Think. Green. Do. 
>>>> Please consider the environment before printing this email.
>>>>

Reply via email to