Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-11-02 Thread Debraj Manna
Matthias

It happened again yesterday during another rolling update. The first error
log I can see on the client side is below. It was there in PENDING_ERROR
state for sometime and then went into ERROR state.

Caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is PENDING_ERROR. at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1728)
at
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.ja


On Tue, Oct 3, 2023 at 8:50 AM Matthias J. Sax  wrote:

> I did mean client side...  If KS goes into ERROR state, it should log
> the reason.
>
> If the logs are indeed empty, try to register an
> uncaught-exception-handler via
>
> KafkaStreamssetUncaughtExceptionHandler(...)
>
>
> -Matthias
>
> On 10/2/23 12:11 PM, Debraj Manna wrote:
> > Are you suggesting to check the Kafka broker logs? I do not see any other
> > errors logs on the client / application side.
> >
> > On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:
> >
> >> In general, Kafka Streams should keep running.
> >>
> >> Can you inspect the logs to figure out why it's going into ERROR state
> >> to begin with? Maybe you need to increase/change some timeouts/retries
> >> configs.
> >>
> >> The stack trace you shared, is a symptom, but not the root cause.
> >>
> >> -Matthias
> >>
> >> On 9/21/23 12:56 AM, Debraj Manna wrote:
> >>> I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and
> Kafka
> >>> stream 3.5.1.
> >>>
> >>> I am observing that whenever some rolling upgrade is done on AWS MSK
> our
> >>> stream application reaches an error state. I get the below exception on
> >>> trying to query the state store
> >>>
> >>> caused by: java.lang.IllegalStateException: KafkaStreams is not
> running.
> >>> State is ERROR.
> >>>   at
> >>>
> >>
> org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
> >>>   at
> >>>
> >>
> org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
> >>>   at
> >>>
> >>
> org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
> >>>
> >>> Can someone let me know what the recommended way we can keep the stream
> >>> application running whenever some rolling upgrade/restart of brokers is
> >>> done in the background?
> >>>
> >>
> >
>


How does kafka consumer behave when consumer poll timeout has expired?

2023-11-02 Thread Debraj Manna
Hi

Can someone let me know how a consumer is expected to behave after the
below log? Will the consumer be considered dead and a new instance will be
spawned due to consumer group rebalancing? How is this behaviour with
RangeAssignor and CooperativeStickyAssginer?

consumer poll timeout has expired. This means the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time processing
messages. You can address this either by increasing max.poll.interval.ms or
by reducing the maximum size of batches returned in poll() with
max.poll.records.

For example, let's say I have two instances of a consumer running on two
different machines. Both instances of the consumer belong to the same
consumer-group and consume from the same topic with 10 partitions. In this
case, what is expected when I see the above logs in both the consumers for
RangeAssignor and CooperativeStickyAssginer

I know what the above log means but want to understand how the consumer
behaves after this log.

Thanks


A Least Effort Kafka cluster rebalancer

2023-11-02 Thread Fabio Pardi
Hi all,

I created a Kafka rebalancer (in bash) which minimizes the number of partitions 
to be moved around when a broker (or more than one) is added or removed from 
the cluster.

Contrarily to the already available 'kafka-reassign-partitions' (which comes 
with Kafka package), this new tool works in a least effort way, and that makes 
a big difference on big (in size) clusters.

It does not require extra packages to be installed because it is written in 
bash. That means also K8s ready!

It can run in an unattended way to downscale a cluster in an automagic way, and 
with very little scripting around it, it can also run unattended when upscaling.


You can find the newborn tool here:

https://github.com/dba-git/kafka_smart_rebalancer/


Feedback and contributions are welcome!


Regards,

Fabio Pardi 
Agileos Consulting LTD
https://www.agileosconsulting.com/


Kafka Behavior During Partition Leader Shutdowns

2023-11-02 Thread Slavo Valko
Hello,

 

I've been working on testing Kafka availability in Zookeeper mode during
single broker shutdowns within a Kubernetes setup, and I've come across
something interesting that I wanted to run by you.

 

We've noticed that when a partition leader goes down, messages are not
delivered until a new leader is elected. While we expect this to happen,
there's a part of it that's still not adding up. The downtime, or the time
it takes for the new leader to step up, is about a minute. But what's
interesting is that when we increase the producer side retries to just 1,
all of our messages get delivered successfully.

 

This seems a bit odd to me because, theoretically, increasing the retries
should only resend the message, giving it an extra 10 seconds before it
times out, while the first few messages should still have around 40 seconds
to wait for the new leader. So, this behavior is a bit of a head-scratcher.


I was wondering if you might have any insights or could point me in the
right direction to understand why this is happening. Any help or guidance
would be greatly appreciated.


Below is a log snippet from one of the test runs:

Partition leader shutdown and observation of new partition leader being
automatically elected in setup with 1 partition and replication factor of 3.
Thu Oct 26 21:59:51 CEST 2023 - Partition leader has been shutdown
Thu Oct 26 22:01:06 CEST 2023 - Change in partition leader detected

Error messages from the producer client during the window when partition
leader is unelected.
Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local:
Message timed out"}. Message content: Message #39 from 2023-10-26 19:59:52

Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local:
Message timed out"}. Message content: Message #40 from 2023-10-26 19:59:53
.
Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local:
Message timed out"}. Message content: Message #97 from 2023-10-26 20:00:50

Failed to send message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local:
Message timed out"}. Message content: Message #98 from 2023-10-26 20:00:51


The container clocks are a little out of sync, but both unavailability
windows match to around one minute.

 

Thanks a lot for your time, and looking forward to hearing from you.