Does Kafka wait for an fsync to send back and ACK for a published message ?

2024-03-14 Thread Sreyan Chakravarty
I am trying to understand when does Kafka signal to the producer that the
message was successfully accepted into Kafka.

Does Kafka:

1) Write to the pagecache of the node's OS and then return back an ACK ?
 If so, then there is potential for message loss if the node crashes before
fsync to disk. But the upside is reduced latency as writes to the pagecache
are very fast compared to a fsync to disk.

2) Wait for an fsync to happen on each message ?
If so, then there is increased latency but guarantees each message is
written to disk

3) Or is this a purely configurable option between the two ?

-- 
Regards,
Sreyan Chakravarty


Re: Does Kafka wait for an fsync to send back and ACK for a published message ?

2024-03-14 Thread Haruki Okada
Hi.

By default, Kafka returns ack without waiting fsync to the disk. But you
can change this behavior by log.flush.interval.messages config.
For data durability, Kafka mainly relies on replication instead.

> then there is potential for message loss if the node crashes before

On the crashed node, that's true. However, as long as you configure
replicas to span multiple AZ, the data-loss possibility would be very rare
because simultaneous multi-AZ power-failure is unlikely to happen.
FYI Jack Vanlightly wrote nice article about this topic:
https://jack-vanlightly.com/blog/2023/4/24/why-apache-kafka-doesnt-need-fsync-to-be-safe

> But the upside is reduced latency as writes to the pagecache

True. That's why Kafka is performant even on HDDs.
Also, relying on page-cache is a good compromise between latency and
durability because it's still robust against application crash (e.g. by JVM
crash).

2024年3月14日(木) 21:37 Sreyan Chakravarty :

> I am trying to understand when does Kafka signal to the producer that the
> message was successfully accepted into Kafka.
>
> Does Kafka:
>
> 1) Write to the pagecache of the node's OS and then return back an ACK ?
>  If so, then there is potential for message loss if the node crashes before
> fsync to disk. But the upside is reduced latency as writes to the pagecache
> are very fast compared to a fsync to disk.
>
> 2) Wait for an fsync to happen on each message ?
> If so, then there is increased latency but guarantees each message is
> written to disk
>
> 3) Or is this a purely configurable option between the two ?
>
> --
> Regards,
> Sreyan Chakravarty
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-14 Thread Haruki Okada
Hi.

> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
this could become a performance bottleneck

As Greg pointed out, this is a trade-off between the ordering-guarantee and
the throughput.
So you should first measure the throughput of
max.in.flight.requests.per.connection=1 with tuning batching config
(batch.size, linger.ms) in your environment and see if it becomes the
bottleneck or not.


2024年3月13日(水) 18:31 William Lee :

> Hi Richard,
> Thanks for replying.
>
> > but I close the KafkaProducer inside the send
> > callback.
> > ...
> >  Combined with idempotence enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
>
> Yes, I also find that closing the KafkaProducer inside the send callback
> can prevent more extra records from being sent. But after some
> investigation into the source code of KafkaProducer and Sender, I think
> closing kafka producer in callback is not 100% reliable in such cases. For
> example, If you set max.in.flight.requests.per.connection to 5, and you
> sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
> callback and you initiated kafka producer closing inside callback, but
> batch No.3 might already in flight which still might be sent to the broker.
> Even though I haven't observed such results during my experiments, I am
> still not sure this is reliable since kafka's official documentation has no
> guarantee about this behaviour.
>
> In the source code of KafkaProducer and Sender, only when
> max.in.flight.requests.per.connection set to 1 will the
> "guaranteeMessageOrder" property set to true thus ensuring only one request
> will be in flight per partition.
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L128
> >
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L538
> >
>
> Do you have any thoughts?
>
> Thanks and regards,
> William Lee
>
> Richard Bosch  于2024年3月13日周三 16:38写道:
>
> > Hi WIlliam,
> >
> > I see from your example that you close the kafka producer in the send
> > loop, based on the content of sendException that is used in the callback
> of
> > the KafkaProducer send.
> > Since your send loop is a different thread than the KafkaProducer uses to
> > send you will encounter race conditions on this close logic.
> >
> > I actually had a similar requirement as yours and solved it by using a
> > sendException like you do, but I close the KafkaProducer inside the send
> > callback. The send callback is executed as part of the produce thread,
> and
> > closing the consumer there will stop all subsequent batches of
> processing,
> > as the current batch isn't finished yet. Combined with idempotence
> enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
> >
> > Kind regards,
> >
> >
> > Richard Bosch
> >
> > Developer Advocate
> >
> > Axual BV
> >
> > https://axual.com/
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-14 Thread Venkatesh Nagarajan
Apologies for the delay in responding to you, Bruno. Thank you very much for 
your important inputs.

Just searched for log messages in the MSK broker logs pertaining to rebalancing 
and updating of metadata for the consumer group and found 412 occurrences in a 
13 hour period. During this time, a load test was run and around 270k events 
were processed. Would you consider this level of rebalancing to be normal?

Also, I need to mention that when offset lags increase, autoscaling creates 
additional ECS tasks to help with faster processing. A lot of rebalancing 
happens for a few hours before the consumer group becomes stable.

By stop-the-world rebalancing, I meant a rebalancing that would cause the 
processing to completely stop when it happens. To avoid this, we use static 
group membership as explained by Matthias in this presentation:

https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid/

Static group membership seems to help reduce the impact of the rebalancing 
caused by scaling out of consumers.

On a separate note, when rebalancing happens, we lose the SumOffsetLag metric 
emitted by MSK for the consumer group. The AWS Support team said that the 
metric will only be available when the consumer group is stable or empty. I am 
not sure if this metric is specific to MSK or if it is related to Apache Kafka. 
If there is another metric I can use which can make offset lags observable even 
during rebalancing, can you please let me know?

Thank you very much.

Kind regards,
Venkatesh

From: Bruno Cadonna 
Date: Wednesday, 13 March 2024 at 8:29 PM
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Hi Venkatesh,

Extending on what Matthias replied, a metadata refresh might trigger a
rebalance if the metadata changed. However, a metadata refresh that does
not show a change in the metadata will not trigger a rebalance. In this
context, i.e., config METADATA_MAX_AGE_CONFIG, the metadata is the
metadata about the cluster received by the client.

The metadata mentioned in the log messages you posted is metadata of the
group to which the member (a.k.a. consumer, a.k.a. client) belongs. The
log message originates from the broker (in contrast
METADATA_MAX_AGE_CONFIG is a client config). If the rebalance were
triggered by a cluster metadata change the log message should contain
something like "cached metadata has changed" as client reason [1].

Your log messages seem genuine log messages that are completely normal
during rebalance events.

How often do they happen?
What do you mean with stop-the-world rebalances?

Best,
Bruno


[1]
https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66


On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:
> Just want to share another variant of the log message which is also related 
> to metadata and rebalancing but has a different client reason:
>
> INFO [GroupCoordinator 3]: Preparing to rebalance group  in state 
> PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: 
> Updating metadata for member  during Stable; client reason: triggered 
> followup rebalance scheduled for 0) (kafka.coordinator.group.GroupCoordinator)
>
> Thank you.
>
> Kind regards,
> Venkatesh
>
> From: Venkatesh Nagarajan 
> Date: Wednesday, 13 March 2024 at 12:06 pm
> To: users@kafka.apache.org 
> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
> Thanks very much for your important inputs, Matthias.
>
> I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I 
> saw a lot of such rebalancing related messages in the MSK broker logs:
>
> INFO [GroupCoordinator 2]: Preparing to rebalance group  in state 
> PreparingRebalance with old generation  (__consumer_offsets-nn) (reason: 
> Updating metadata for member  during Stable; client reason: need to 
> revoke partitions and re-join) (kafka.coordinator.group.GroupCoordinator)
>
> I am guessing that the two are unrelated. If you have any suggestions on how 
> to reduce such rebalancing, that will be very helpful.
>
> Thank you very much.
>
> Kind regards,
> Venkatesh
>
> From: Matthias J. Sax 
> Date: Tuesday, 12 March 2024 at 1:31 pm
> To: users@kafka.apache.org 
> Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
> Without detailed logs (maybe even DEBUG) hard to say.
>
> But from what you describe, it could be a metadata issue? Why are you
> setting
>
>> METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
>> rebalances rare)
>
> Refreshing metadata has nothing to do with rebalances, and a metadata
> refresh does not trigger a rebalance.
>
>
>

Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-14 Thread Venkatesh Nagarajan
Just want to make a correction, Bruno - My understanding is that Kafka Streams 
3.5.1 uses Incremental Cooperative Rebalancing which seems to help reduce the 
impact of rebalancing caused by autoscaling etc.:

https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/

Static group membership may also have a role to play especially if the ECS 
tasks get restarted for some reason.


I also want to mention to you about this error which occurred 759 times during 
the 13 hour load test:

ERROR org.apache.kafka.streams.processor.internals.StandbyTask - stream-thread 
[-StreamThread-1] standby-task [1_32] Failed to acquire lock while 
closing the state store for STANDBY task

I think Kafka Streams automatically recovers from this. Also, I have seen this 
error to increase when the number of streaming threads is high (30 or 60 
threads). So I use just 10 threads per ECS task.

Kind regards,
Venkatesh

From: Venkatesh Nagarajan 
Date: Friday, 15 March 2024 at 8:30 AM
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Apologies for the delay in responding to you, Bruno. Thank you very much for 
your important inputs.

Just searched for log messages in the MSK broker logs pertaining to rebalancing 
and updating of metadata for the consumer group and found 412 occurrences in a 
13 hour period. During this time, a load test was run and around 270k events 
were processed. Would you consider this level of rebalancing to be normal?

Also, I need to mention that when offset lags increase, autoscaling creates 
additional ECS tasks to help with faster processing. A lot of rebalancing 
happens for a few hours before the consumer group becomes stable.

By stop-the-world rebalancing, I meant a rebalancing that would cause the 
processing to completely stop when it happens. To avoid this, we use static 
group membership as explained by Matthias in this presentation:

https://www.confluent.io/kafka-summit-lon19/everything-you-wanted-to-know-kafka-afraid/

Static group membership seems to help reduce the impact of the rebalancing 
caused by scaling out of consumers.

On a separate note, when rebalancing happens, we lose the SumOffsetLag metric 
emitted by MSK for the consumer group. The AWS Support team said that the 
metric will only be available when the consumer group is stable or empty. I am 
not sure if this metric is specific to MSK or if it is related to Apache Kafka. 
If there is another metric I can use which can make offset lags observable even 
during rebalancing, can you please let me know?

Thank you very much.

Kind regards,
Venkatesh

From: Bruno Cadonna 
Date: Wednesday, 13 March 2024 at 8:29 PM
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Hi Venkatesh,

Extending on what Matthias replied, a metadata refresh might trigger a
rebalance if the metadata changed. However, a metadata refresh that does
not show a change in the metadata will not trigger a rebalance. In this
context, i.e., config METADATA_MAX_AGE_CONFIG, the metadata is the
metadata about the cluster received by the client.

The metadata mentioned in the log messages you posted is metadata of the
group to which the member (a.k.a. consumer, a.k.a. client) belongs. The
log message originates from the broker (in contrast
METADATA_MAX_AGE_CONFIG is a client config). If the rebalance were
triggered by a cluster metadata change the log message should contain
something like "cached metadata has changed" as client reason [1].

Your log messages seem genuine log messages that are completely normal
during rebalance events.

How often do they happen?
What do you mean with stop-the-world rebalances?

Best,
Bruno


[1]
https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66


On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:
> Just want to share another variant of the log message which is also related 
> to metadata and rebalancing but has a different client reason:
>
> INFO [GroupCoordinator 3]: Preparing to rebalance group  in state 
> PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: 
> Updating metadata for member  during Stable; client reason: triggered 
> followup rebalance scheduled for 0) (kafka.coordinator.group.GroupCoordinator)
>
> Thank you.
>
> Kind regards,
> Venkatesh
>
> From: Venkatesh Nagarajan 
> Date: Wednesday, 13 March 2024 at 12:06 pm
> To: users@kafka.apache.org 
> Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
> Thanks very much for your important inputs, Matthias.
>
> I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I 
> saw