Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers
Matthias It happened again yesterday during another rolling update. The first error log I can see on the client side is below. It was there in PENDING_ERROR state for sometime and then went into ERROR state. Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is PENDING_ERROR. at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381) at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663) at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1728) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.ja On Tue, Oct 3, 2023 at 8:50 AM Matthias J. Sax wrote: > I did mean client side... If KS goes into ERROR state, it should log > the reason. > > If the logs are indeed empty, try to register an > uncaught-exception-handler via > > KafkaStreamssetUncaughtExceptionHandler(...) > > > -Matthias > > On 10/2/23 12:11 PM, Debraj Manna wrote: > > Are you suggesting to check the Kafka broker logs? I do not see any other > > errors logs on the client / application side. > > > > On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax, wrote: > > > >> In general, Kafka Streams should keep running. > >> > >> Can you inspect the logs to figure out why it's going into ERROR state > >> to begin with? Maybe you need to increase/change some timeouts/retries > >> configs. > >> > >> The stack trace you shared, is a symptom, but not the root cause. > >> > >> -Matthias > >> > >> On 9/21/23 12:56 AM, Debraj Manna wrote: > >>> I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and > Kafka > >>> stream 3.5.1. > >>> > >>> I am observing that whenever some rolling upgrade is done on AWS MSK > our > >>> stream application reaches an error state. I get the below exception on > >>> trying to query the state store > >>> > >>> caused by: java.lang.IllegalStateException: KafkaStreams is not > running. > >>> State is ERROR. > >>> at > >>> > >> > org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381) > >>> at > >>> > >> > org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663) > >>> at > >>> > >> > org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227) > >>> > >>> Can someone let me know what the recommended way we can keep the stream > >>> application running whenever some rolling upgrade/restart of brokers is > >>> done in the background? > >>> > >> > > >
How does kafka consumer behave when consumer poll timeout has expired?
Hi Can someone let me know how a consumer is expected to behave after the below log? Will the consumer be considered dead and a new instance will be spawned due to consumer group rebalancing? How is this behaviour with RangeAssignor and CooperativeStickyAssginer? consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. For example, let's say I have two instances of a consumer running on two different machines. Both instances of the consumer belong to the same consumer-group and consume from the same topic with 10 partitions. In this case, what is expected when I see the above logs in both the consumers for RangeAssignor and CooperativeStickyAssginer I know what the above log means but want to understand how the consumer behaves after this log. Thanks
A Least Effort Kafka cluster rebalancer
Hi all, I created a Kafka rebalancer (in bash) which minimizes the number of partitions to be moved around when a broker (or more than one) is added or removed from the cluster. Contrarily to the already available 'kafka-reassign-partitions' (which comes with Kafka package), this new tool works in a least effort way, and that makes a big difference on big (in size) clusters. It does not require extra packages to be installed because it is written in bash. That means also K8s ready! It can run in an unattended way to downscale a cluster in an automagic way, and with very little scripting around it, it can also run unattended when upscaling. You can find the newborn tool here: https://github.com/dba-git/kafka_smart_rebalancer/ Feedback and contributions are welcome! Regards, Fabio Pardi Agileos Consulting LTD https://www.agileosconsulting.com/
Kafka Behavior During Partition Leader Shutdowns
Hello, I've been working on testing Kafka availability in Zookeeper mode during single broker shutdowns within a Kubernetes setup, and I've come across something interesting that I wanted to run by you. We've noticed that when a partition leader goes down, messages are not delivered until a new leader is elected. While we expect this to happen, there's a part of it that's still not adding up. The downtime, or the time it takes for the new leader to step up, is about a minute. But what's interesting is that when we increase the producer side retries to just 1, all of our messages get delivered successfully. This seems a bit odd to me because, theoretically, increasing the retries should only resend the message, giving it an extra 10 seconds before it times out, while the first few messages should still have around 40 seconds to wait for the new leader. So, this behavior is a bit of a head-scratcher. I was wondering if you might have any insights or could point me in the right direction to understand why this is happening. Any help or guidance would be greatly appreciated. Below is a log snippet from one of the test runs: Partition leader shutdown and observation of new partition leader being automatically elected in setup with 1 partition and replication factor of 3. Thu Oct 26 21:59:51 CEST 2023 - Partition leader has been shutdown Thu Oct 26 22:01:06 CEST 2023 - Change in partition leader detected Error messages from the producer client during the window when partition leader is unelected. Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}. Message content: Message #39 from 2023-10-26 19:59:52 Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}. Message content: Message #40 from 2023-10-26 19:59:53 . Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}. Message content: Message #97 from 2023-10-26 20:00:50 Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}. Message content: Message #98 from 2023-10-26 20:00:51 The container clocks are a little out of sync, but both unavailability windows match to around one minute. Thanks a lot for your time, and looking forward to hearing from you.