Apologies for the delay, Bruno. Thank you so much for the excellent link and for your inputs! Also, I would like to thank Matthias and yourself for the guidance on the stalling issue in the Kafka Streams client. After restoring the default value for the METADATA_MAX_AGE_CONFIG, I haven’t seen the issue happening. Heavy rebalancing (as mentioned before) continues to happen. I will refer to the link which mentions about certain metrics which can give insights.
Thank you very much. Kind regards, Venkatesh From: Bruno Cadonna <cado...@apache.org> Date: Friday, 22 March 2024 at 9:53 PM To: users@kafka.apache.org <users@kafka.apache.org> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled Hi Venkatesh, The 1 core 1 stream thread recommendation is just s starting point. You need to set the number of stream thread as it fits you by monitoring the app. Maybe this blog post might be interesting for you: https://www.responsive.dev/blog/a-size-for-every-stream<https://www.responsive.dev/blog/a-size-for-every-stream> Best, Bruno On 3/19/24 4:14 AM, Venkatesh Nagarajan wrote: > Thanks very much for sharing the links and for your important inputs, Bruno! > >> We recommend to use as many stream threads as cores on the compute node >> where the Kafka Streams client is run. How many Kafka Streams tasks do you >> have to distribute over the clients? > > We use 1vCPU (probably 1 core) per Kafka Streams Client (ECS Task). Each > client/ECS Task runs 10 streaming threads and the CPU utilisation is just 4% > on an average. It increases when transient errors occur as they require > retries and threads to be replaced. > > We run a maximum of 6 clients/ECS Tasks when the offset lags are high. The > input topics have 60 partitions each and this matches (total number of > clients/ECS Tasks i.e. 6) * ( Streaming threads per client/ECS task i.e.10). > > With the 1 streaming thread per core approach, we will need 60 vCPUs/cores. > As I mentioned above, we have observed 10 threads using just 4% of 1 > vCPU/core on an average. It may be difficult to justify provisioning more > cores as it will be expensive and because Kafka Streams recovers from > failures in acquiring locks. > > Please feel free to correct me and/or share your thoughts. > > Thank you. > > Kind regards, > Venkatesh > > From: Bruno Cadonna <cado...@apache.org> > Date: Friday, 15 March 2024 at 8:47 PM > To: users@kafka.apache.org <users@kafka.apache.org> > Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled > Hi Venkatesh, > > As you discovered, in Kafka Streams 3.5.1 there is no stop-the-world > rebalancing. > > Static group member is helpful when Kafka Streams clients are restarted > as you pointed out. > >> ERROR org.apache.kafka.streams.processor.internals.StandbyTask - > stream-thread [<member>-StreamThread-1] standby-task [1_32] Failed to > acquire lock while closing the state store for STANDBY task > > This error (and some others about lock acquisition) happens when a > stream thread wants to lock the state directory for a task but the > stream thread on the same Kafka Streams client has not releases the lock > yet. And yes, Kafka Streams handles them. > > 30 and 60 stream threads is a lot for one Kafka Streams client. We > recommend to use as many stream threads as cores on the compute node > where the Kafka Streams client is run. How many Kafka Streams tasks do > you have to distribute over the clients? > >> Would you consider this level of rebalancing to be normal? > > The rate of rebalance events seems high indeed. However, the log > messages you posted in one of your last e-mails are normal during a > rebalance and they have nothing to do with METADATA_MAX_AGE_CONFIG. > > I do not know the metric SumOffsetLag. Judging from a quick search on > the internet, I think it is a MSK specific metric. > https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works<https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works><https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works<https://repost.aws/questions/QUthnU3gycT-qj3Mtb-ekmRA/msk-metric-sumoffsetlag-how-it-works>> > Under the link you can also find some other metrics that you can use. > > The following talk might help you debugging your rebalance issues: > > https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story/<https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story><https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story<https://www.confluent.io/events/kafka-summit-london-2023/kafka-streams-rebalances-and-assignments-the-whole-story>> > > > Best, > Bruno > > On 3/14/24 11:11 PM, Venkatesh Nagarajan wrote: >> Just want to make a correction, Bruno - My understanding is that Kafka >> Streams 3.5.1 uses Incremental Cooperative Rebalancing which seems to help >> reduce the impact of rebalancing caused by autoscaling etc.: >> >> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/<https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka><https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka<https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka>> >> >> Static group membership may also have a role to play especially if the ECS >> tasks get restarted for some reason. >> >> >> I also want to mention to you about this error which occurred 759 times >> during the 13 hour load test: >> >> ERROR org.apache.kafka.streams.processor.internals.StandbyTask - >> stream-thread [<member>-StreamThread-1] standby-task [1_32] Failed to >> acquire lock while closing the state store for STANDBY task >> >> I think Kafka Streams automatically recovers from this. Also, I have seen >> this error to increase when the number of streaming threads is high (30 or >> 60 threads). So I use just 10 threads per ECS task. >> >> Kind regards, >> Venkatesh >> >> From: Venkatesh Nagarajan <venkatesh.nagara...@uts.edu.au> >> Date: Friday, 15 March 2024 at 8:30 AM >> To: users@kafka.apache.org <users@kafka.apache.org> >> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get >> stalled >> Apologies for the delay in responding to you, Bruno. Thank you very much for >> your important inputs. >> >> Just searched for log messages in the MSK broker logs pertaining to >> rebalancing and updating of metadata for the consumer group and found 412 >> occurrences in a 13 hour period. During this time, a load test was run and >> around 270k events were processed. Would you consider this level of >> rebalancing to be normal? >> >> Also, I need to mention that when offset lags increase, autoscaling creates >> additional ECS tasks to help with faster processing. A lot of rebalancing >> happens for a few hours before the consumer group becomes stable. >> >> By stop-the-world rebalancing, I meant a rebalancing that would cause the >> processing to completely stop when it happens. To avoid this, we use static >> group membership as explained by Matthias in this presentation: >> >> https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid/<https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid><https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid<https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid>> >> >> Static group membership seems to help reduce the impact of the rebalancing >> caused by scaling out of consumers. >> >> On a separate note, when rebalancing happens, we lose the SumOffsetLag >> metric emitted by MSK for the consumer group. The AWS Support team said that >> the metric will only be available when the consumer group is stable or >> empty. I am not sure if this metric is specific to MSK or if it is related >> to Apache Kafka. If there is another metric I can use which can make offset >> lags observable even during rebalancing, can you please let me know? >> >> Thank you very much. >> >> Kind regards, >> Venkatesh >> >> From: Bruno Cadonna <cado...@apache.org> >> Date: Wednesday, 13 March 2024 at 8:29 PM >> To: users@kafka.apache.org <users@kafka.apache.org> >> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get >> stalled >> Hi Venkatesh, >> >> Extending on what Matthias replied, a metadata refresh might trigger a >> rebalance if the metadata changed. However, a metadata refresh that does >> not show a change in the metadata will not trigger a rebalance. In this >> context, i.e., config METADATA_MAX_AGE_CONFIG, the metadata is the >> metadata about the cluster received by the client. >> >> The metadata mentioned in the log messages you posted is metadata of the >> group to which the member (a.k.a. consumer, a.k.a. client) belongs. The >> log message originates from the broker (in contrast >> METADATA_MAX_AGE_CONFIG is a client config). If the rebalance were >> triggered by a cluster metadata change the log message should contain >> something like "cached metadata has changed" as client reason [1]. >> >> Your log messages seem genuine log messages that are completely normal >> during rebalance events. >> >> How often do they happen? >> What do you mean with stop-the-world rebalances? >> >> Best, >> Bruno >> >> >> [1] >> https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>>> >> >> >> On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote: >>> Just want to share another variant of the log message which is also related >>> to metadata and rebalancing but has a different client reason: >>> >>> INFO [GroupCoordinator 3]: Preparing to rebalance group <group> in state >>> PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: >>> Updating metadata for member <member> during Stable; client reason: >>> triggered followup rebalance scheduled for 0) >>> (kafka.coordinator.group.GroupCoordinator) >>> >>> Thank you. >>> >>> Kind regards, >>> Venkatesh >>> >>> From: Venkatesh Nagarajan <venkatesh.nagara...@uts.edu.au> >>> Date: Wednesday, 13 March 2024 at 12:06 pm >>> To: users@kafka.apache.org <users@kafka.apache.org> >>> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get >>> stalled >>> Thanks very much for your important inputs, Matthias. >>> >>> I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I >>> saw a lot of such rebalancing related messages in the MSK broker logs: >>> >>> INFO [GroupCoordinator 2]: Preparing to rebalance group <group> in state >>> PreparingRebalance with old generation nnnn (__consumer_offsets-nn) >>> (reason: Updating metadata for member <member> during Stable; client >>> reason: need to revoke partitions and re-join) >>> (kafka.coordinator.group.GroupCoordinator) >>> >>> I am guessing that the two are unrelated. If you have any suggestions on >>> how to reduce such rebalancing, that will be very helpful. >>> >>> Thank you very much. >>> >>> Kind regards, >>> Venkatesh >>> >>> From: Matthias J. Sax <mj...@apache.org> >>> Date: Tuesday, 12 March 2024 at 1:31 pm >>> To: users@kafka.apache.org <users@kafka.apache.org> >>> Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled >>> Without detailed logs (maybe even DEBUG) hard to say. >>> >>> But from what you describe, it could be a metadata issue? Why are you >>> setting >>> >>>> METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to >>>> make rebalances rare) >>> >>> Refreshing metadata has nothing to do with rebalances, and a metadata >>> refresh does not trigger a rebalance. >>> >>> >>> >>> -Matthias >>> >>> >>> On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote: >>>> Hi all, >>>> >>>> A Kafka Streams application sometimes stops consuming events during load >>>> testing. Please find below the details: >>>> >>>> Details of the app: >>>> >>>> >>>> * Kafka Streams Version: 3.5.1 >>>> * Kafka: AWS MSK v3.6.0 >>>> * Consumes events from 6 topics >>>> * Calls APIs to enrich events >>>> * Sometimes joins two streams >>>> * Produces enriched events in output topics >>>> >>>> Runs on AWS ECS: >>>> >>>> * Each task has 10 streaming threads >>>> * Autoscaling based on offset lags and a maximum of 6 ECS tasks >>>> * Input topics have 60 partitions each to match 6 tasks * 10 threads >>>> * Fairly good spread of events across all topic partitions using >>>> partitioning keys >>>> >>>> Settings and configuration: >>>> >>>> >>>> * At least once semantics >>>> * MAX_POLL_RECORDS_CONFIG: 10 >>>> * APPLICATION_ID_CONFIG >>>> >>>> // Make rebalances rare and prevent stop-the-world rebalances >>>> >>>> * Static membership (using GROUP_INSTANCE_ID_CONFIG) >>>> * METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to >>>> make rebalances rare) >>>> * MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis >>>> * SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis >>>> >>>> State store related settings: >>>> >>>> * TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE >>>> * STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L >>>> * NUM_STANDBY_REPLICAS_CONFIG: 1 >>>> >>>> >>>> Symptoms: >>>> The symptoms mentioned below occur during load tests: >>>> >>>> Scenario# 1: >>>> Steady input event stream >>>> >>>> Observations: >>>> >>>> * Gradually increasing offset lags which shouldn't happen normally as the >>>> streaming app is quite fast >>>> * Events get processed >>>> >>>> Scenario# 2: >>>> No input events after the load test stops producing events >>>> >>>> Observations: >>>> >>>> * Offset lag stuck at ~5k >>>> * Stable consumer group >>>> * No events processed >>>> * No errors or messages in the logs >>>> >>>> >>>> Scenario# 3: >>>> Restart the app when it stops processing events although offset lags are >>>> not zero >>>> >>>> Observations: >>>> >>>> * Offset lags start reducing and events start getting processed >>>> >>>> Scenario# 4: >>>> Transient errors occur while processing events >>>> >>>> >>>> * A custom exception handler that implements >>>> StreamsUncaughtExceptionHandler returns >>>> StreamThreadExceptionResponse.REPLACE_THREAD in the handle method >>>> * If transient errors keep occurring occasionally and threads get >>>> replaced, the problem of the app stalling disappears. >>>> * But if transient errors don't occur, the app tends to stall and I need >>>> to manually restart it >>>> >>>> >>>> Summary: >>>> >>>> * It appears that some streaming threads stall after processing for a >>>> while. >>>> * It is difficult to change log level for Kafka Streams from ERROR to INFO >>>> as it starts producing a lot of log messages especially during load tests. >>>> * I haven't yet managed to push Kafka streams metrics into AWS OTEL >>>> collector to get more insights. >>>> >>>> Can you please let me know if any Kafka Streams config settings need >>>> changing? Should I reduce the values of any of these settings to help >>>> trigger rebalancing early and hence assign partitions to members that are >>>> active: >>>> >>>> >>>> * METADATA_MAX_AGE_CONFIG: 5 hours in millis (to make rebalances rare) >>>> * MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis >>>> * SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis >>>> >>>> Should I get rid of static membership – this may increase rebalancing but >>>> may be okay if it can prevent stalled threads from appearing as active >>>> members >>>> >>>> Should I try upgrading Kafka Streams to v3.6.0 or v3.7.0? Hoping that >>>> v3.7.0 will be compatible with AWS MSK v3.6.0. >>>> >>>> >>>> Thank you very much. >>>> >>>> Kind regards, >>>> Venkatesh >>>> >>>> UTS CRICOS Provider Code: 00099F DISCLAIMER: This email message and any >>>> accompanying attachments may contain confidential information. If you are >>>> not the intended recipient, do not read, use, disseminate, distribute or >>>> copy this message or attachments. If you have received this message in >>>> error, please notify the sender immediately and delete this message. Any >>>> views expressed in this message are those of the individual sender, except >>>> where the sender expressly, and with authority, states them to be the >>>> views of the University of Technology Sydney. Before opening any >>>> attachments, please check them for viruses and defects. Think. Green. Do. >>>> Please consider the environment before printing this email. >>>>