RE: Kafka Streams - Producer attempted to produce with an old epoch.

2022-10-27 Thread Andrew Muraco
Detail I forgot to mention is that I am using EOS, so the streams application 
was consistently getting this error causing it to restart that task, which 
obviously would bottle neck everything.
Once I increased the threads on the brokers the epoch error subsided until the 
volume increased.

Right now the streams application has 36 threads per box (5 boxes w/ 48 
threads) and when running normally it is keeping up, but once this epoch error 
starts it causes a cascade of slowness. The CPU is not even being fully 
utilized due to this error happening every minute or so.

Perhaps some other tuning is needed too, but I'm lost what to look at.
I do have JMX connection to the broker and streams applications if there's any 
useful information I should be looking at.

-Original Message-
From: Sophie Blee-Goldman  
Sent: Thursday, October 27, 2022 11:22 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams - Producer attempted to produce with an old epoch.

CAUTION: This email originated from outside of ShopHQ. Do not click links or 
open attachments unless you recognize the sender and know the content is safe!


I'm not one of the real experts on the Producer and even further from one with 
broker performance, so someone else may need to chime in for that, but I did 
have a few questions:

What specifically are you unsatisfied with w.r.t the performance? Are you 
hoping for a higher throughput of your Streams app's output, or is there 
something about the brokers? I'm curious why you started with increasing the 
broker threads, especially if the perf issue/bottleneck is with the Streams 
app's processing (but maybe it is not). I would imagine that throwing more and 
more threads at the machine could even make things worse, it definitely will if 
the thread count gets high enough though it's hard to say where/when it might 
start to decline. Point is, if the brokers are eating up all the cpu time with 
their own threads then the clients embedded in Streams may be getting starved 
out at times, causing that StreamThread/consumer to drop out of the group and 
resulting in the producer getting fenced. Or it could be blocking i/o for 
rocksdb  and leading to write stalls, which could similarly get that 
StreamThread kicked from the consumer group (if the application has state, 
especially if quite a lot).

How many StreamThreads did you give the app btw?

On Thu, Oct 27, 2022 at 8:01 PM Andrew Muraco  wrote:

> Hi,
> I have a kafka streams application deployed on 5 nodes and with full 
> traffic I am getting the error message:
>
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be 
> sent since the producer is fenced, indicating the task may be migrated 
> out
>
> I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines 
> are the kafka brokers with 2x1TB disks for kafka logs and also running 
> the kafka Streams application.
> 2x replication factor on topic, topic is producing about 250k per second.
> I have 2 aggregations in the topology to 2 output topics, the final 
> output topics are in the 10s of k range per second.
>
> I'm assuming I have a bottleneck somewhere, I increased the broker 
> thread counts and observed that this frequency of this error reduced, 
> but it's still happening.
> Here's the broker configuration I'm using now, but I might be 
> overshooting some of these values.
>
> num.network.threads=48
> num.io.threads=48
> socket.send.buffer.bytes=512000
> socket.receive.buffer.bytes=512000
> replica.socket.receive.buffer.bytes=1024000
> socket.request.max.bytes=10485760
> num.replica.fetchers=48
> log.cleaner.threads=48
> queued.max.requests=48000
>
> I can't find good documentation on the effect of broker configuration 
> on performance.
>


Re: Kafka Streams - Producer attempted to produce with an old epoch.

2022-10-27 Thread Sophie Blee-Goldman
I'm not one of the real experts on the Producer and even further from one
with broker performance, so someone else may need to chime in for that, but
I did have a few questions:

What specifically are you unsatisfied with w.r.t the performance? Are you
hoping for a higher throughput
of your Streams app's output, or is there something about the brokers? I'm
curious why you started with
increasing the broker threads, especially if the perf issue/bottleneck is
with the Streams app's processing
(but maybe it is not). I would imagine that throwing more and more threads
at the machine could even
make things worse, it definitely will if the thread count gets high enough
though it's hard to say where/when
it might start to decline. Point is, if the brokers are eating up all the
cpu time with their own threads then
the clients embedded in Streams may be getting starved out at times,
causing that StreamThread/consumer
to drop out of the group and resulting in the producer getting fenced. Or
it could be blocking i/o for rocksdb
 and leading to write stalls, which could similarly get that StreamThread
kicked from the consumer group
(if the application has state, especially if quite a lot).

How many StreamThreads did you give the app btw?

On Thu, Oct 27, 2022 at 8:01 PM Andrew Muraco  wrote:

> Hi,
> I have a kafka streams application deployed on 5 nodes and with full
> traffic I am getting the error message:
>
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent
> since the producer is fenced, indicating the task may be migrated out
>
> I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines are
> the kafka brokers with 2x1TB disks for kafka logs and also running the
> kafka Streams application.
> 2x replication factor on topic, topic is producing about 250k per second.
> I have 2 aggregations in the topology to 2 output topics, the final output
> topics are in the 10s of k range per second.
>
> I'm assuming I have a bottleneck somewhere, I increased the broker thread
> counts and observed that this frequency of this error reduced, but it's
> still happening.
> Here's the broker configuration I'm using now, but I might be overshooting
> some of these values.
>
> num.network.threads=48
> num.io.threads=48
> socket.send.buffer.bytes=512000
> socket.receive.buffer.bytes=512000
> replica.socket.receive.buffer.bytes=1024000
> socket.request.max.bytes=10485760
> num.replica.fetchers=48
> log.cleaner.threads=48
> queued.max.requests=48000
>
> I can't find good documentation on the effect of broker configuration on
> performance.
>


Kafka Streams - Producer attempted to produce with an old epoch.

2022-10-27 Thread Andrew Muraco
Hi,
I have a kafka streams application deployed on 5 nodes and with full traffic I 
am getting the error message:

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out

I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines are the 
kafka brokers with 2x1TB disks for kafka logs and also running the kafka 
Streams application.
2x replication factor on topic, topic is producing about 250k per second. I 
have 2 aggregations in the topology to 2 output topics, the final output topics 
are in the 10s of k range per second.

I'm assuming I have a bottleneck somewhere, I increased the broker thread 
counts and observed that this frequency of this error reduced, but it's still 
happening.
Here's the broker configuration I'm using now, but I might be overshooting some 
of these values.

num.network.threads=48
num.io.threads=48
socket.send.buffer.bytes=512000
socket.receive.buffer.bytes=512000
replica.socket.receive.buffer.bytes=1024000
socket.request.max.bytes=10485760
num.replica.fetchers=48
log.cleaner.threads=48
queued.max.requests=48000

I can't find good documentation on the effect of broker configuration on 
performance.


Topic Compaction

2022-10-27 Thread Navneeth Krishnan
Hi All,

We are using AWS MSK with kafka version 2.6.1. There is a compacted topic
with the below configurations. After reading the documentation my
understanding was that null values in the topic can be removed using delete
retention time but I can see months old keys having null values. Is there
any other configuration that needs to be changed for removing null values
from a compacted topic? Thanks!

cleanup.policy=compact
segment.bytes=1073741824
min.cleanable.dirty.ratio=0.1
delete.retention.ms=360
segment.ms=360

Regards,
Navneeth


AWS MSK Rack Awareness

2022-10-27 Thread Navneeth Krishnan
Hi All,

Has anyone implemented rack awareness with AWS MSK? In the broker
configuration I see the following values (use1-az2, use1-az4 & use1-az6).
How are these values determined and how can we inject these values to java
consumers? Thanks!

Regards,
Navneeth


Re: Balancing traffic between multiple directories

2022-10-27 Thread Alex Craig
I don't think the Confluent self-balancing feature works if you have your
broker data in multiple directories anyway - it's expecting a single dir
per broker and will try and keep the data balanced between brokers.  Also
just as an aside, I'm not sure there's much value in using multiple
directories.  I assume you have these mapped to individual disks?  I'd be
curious to hear if you actually get any performance benefit out of that,
especially when weighed against the increased likelihood of disk failure.
I realize that doesn't help your current problem, more of a
question/discussion point I guess.  I think your only option for moving the
data around is the kafka-reassign-partitions.sh script.

Alex C

On Thu, Oct 27, 2022 at 9:30 AM Andrew Grant 
wrote:

> There's Cruise Control, https://github.com/linkedin/cruise-control, which
> is open-source and could help with automated balancing.
>
> On Thu, Oct 27, 2022 at 10:26 AM  wrote:
>
> > Auto rebalancing is a very important feature to run Kafka in a production
> > environment. Given the confluent already have this feature, are there any
> > space that the open source version could have this feature as well?
> > Or, is it the idea that opensource version shouldn't be used in a high
> > load production environment?
> >
> > 
> > 发件人: sunil chaudhari 
> > 发送时间: 2022年10月27日 3:11
> > 收件人: users@kafka.apache.org 
> > 主题: Re: Balancing traffic between multiple directories
> >
> > Hi Lehar,
> > You are right. There is no better way in open source Kafka.
> > However confluent has something called as Auto Rebalancing feature.
> > Can you check if there is free version with this feature?
> >
> > It start balancing of  brokers automatically when it see there is uneven
> > distribution of partitions.
> >
> > Regards,
> > Sunil.
> > On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain 
> > wrote:
> >
> > > Hey Andrew,
> > >
> > > Thanks for the reply. Currently, we are using the same method as you
> > > described. Wanted to make sure if there is a better way.
> > >
> > > It seems there isn't currently. So we will keep using this only.
> > >
> > > On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant
>  > >
> > > wrote:
> > >
> > > > Hey Lehar,
> > > >
> > > >
> > > > I don’t think there’s a way to control this during topic creation. I
> > just
> > > > took a look through
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala
> > > > and it does appear partition assignment does not account for each
> > > broker’s
> > > > different log directories. I also took a look at the kafka-topics.sh
> > > script
> > > > and it has a --replica-assignment argument but that looks to only
> allow
> > > > specifying brokers. During topic creation, once a replica has been
> > > chosen I
> > > > think we then choose the directory with the fewest number of
> > partitions -
> > > > see
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192
> > > >
> > > >
> > > > What I think you can do is move existing partitions around with the
> > > > kafka-reassign-partitions.sh script. From running the command
> locally:
> > > >
> > > >
> > > > --reassignment-json-file  > partition
> > > >
> > > >   manual assignment json file path>   reassignment
> configurationThe
> > > > format
> > > >
> > > >   to use is -
> > > >
> > > > {"partitions":
> > > >
> > > > [{"topic": "foo",
> > > >
> > > >   "partition": 1,
> > > >
> > > >   "replicas": [1,2,3],
> > > >
> > > >   "log_dirs":
> > > > ["dir1","dir2","dir3"]
> > > >
> > > >   }],
> > > >
> > > > "version":1
> > > >
> > > > }
> > > >
> > > > Note that "log_dirs" is
> > optional.
> > > > When
> > > >
> > > >   it is specified, its length
> > > must
> > > >
> > > >   equal the length of the
> > > replicas
> > > >
> > > >   list. The value in this
> list
> > > can
> > > > be
> > > >
> > > >   either "any" or the
> > absolution
> > > > path
> > > >
> > > >   of the log directory on the
> > > > broker.
> > > >
> > > >   If absolute log directory
> > path
> > > is
> > > >
> > > >   specified, the replica will
> > be
> > > > moved
> > > >
> > > >   to the specified log
> > directory
> > > on
> > > >
> > > >

Re: Balancing traffic between multiple directories

2022-10-27 Thread Andrew Grant
There's Cruise Control, https://github.com/linkedin/cruise-control, which
is open-source and could help with automated balancing.

On Thu, Oct 27, 2022 at 10:26 AM  wrote:

> Auto rebalancing is a very important feature to run Kafka in a production
> environment. Given the confluent already have this feature, are there any
> space that the open source version could have this feature as well?
> Or, is it the idea that opensource version shouldn't be used in a high
> load production environment?
>
> 
> 发件人: sunil chaudhari 
> 发送时间: 2022年10月27日 3:11
> 收件人: users@kafka.apache.org 
> 主题: Re: Balancing traffic between multiple directories
>
> Hi Lehar,
> You are right. There is no better way in open source Kafka.
> However confluent has something called as Auto Rebalancing feature.
> Can you check if there is free version with this feature?
>
> It start balancing of  brokers automatically when it see there is uneven
> distribution of partitions.
>
> Regards,
> Sunil.
> On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain 
> wrote:
>
> > Hey Andrew,
> >
> > Thanks for the reply. Currently, we are using the same method as you
> > described. Wanted to make sure if there is a better way.
> >
> > It seems there isn't currently. So we will keep using this only.
> >
> > On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant  >
> > wrote:
> >
> > > Hey Lehar,
> > >
> > >
> > > I don’t think there’s a way to control this during topic creation. I
> just
> > > took a look through
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala
> > > and it does appear partition assignment does not account for each
> > broker’s
> > > different log directories. I also took a look at the kafka-topics.sh
> > script
> > > and it has a --replica-assignment argument but that looks to only allow
> > > specifying brokers. During topic creation, once a replica has been
> > chosen I
> > > think we then choose the directory with the fewest number of
> partitions -
> > > see
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192
> > >
> > >
> > > What I think you can do is move existing partitions around with the
> > > kafka-reassign-partitions.sh script. From running the command locally:
> > >
> > >
> > > --reassignment-json-file  partition
> > >
> > >   manual assignment json file path>   reassignment configurationThe
> > > format
> > >
> > >   to use is -
> > >
> > > {"partitions":
> > >
> > > [{"topic": "foo",
> > >
> > >   "partition": 1,
> > >
> > >   "replicas": [1,2,3],
> > >
> > >   "log_dirs":
> > > ["dir1","dir2","dir3"]
> > >
> > >   }],
> > >
> > > "version":1
> > >
> > > }
> > >
> > > Note that "log_dirs" is
> optional.
> > > When
> > >
> > >   it is specified, its length
> > must
> > >
> > >   equal the length of the
> > replicas
> > >
> > >   list. The value in this list
> > can
> > > be
> > >
> > >   either "any" or the
> absolution
> > > path
> > >
> > >   of the log directory on the
> > > broker.
> > >
> > >   If absolute log directory
> path
> > is
> > >
> > >   specified, the replica will
> be
> > > moved
> > >
> > >   to the specified log
> directory
> > on
> > >
> > >   the broker.
> > >
> > >
> > > There’s the log_dirs field you can use in the JSON file to move
> > partitions
> > > between directories.
> > >
> > >
> > > Hope that helps a bit.
> > >
> > >
> > > Andrew
> > >
> > > On Tue, Oct 25, 2022 at 6:56 AM Lehar Jain 
> > > wrote:
> > >
> > > > Hey,
> > > >
> > > > We run Kafka brokers with multiple log directories. I wanted to know
> > how
> > > > Kafka balances traffic between various directories. Can we have our
> own
> > > > strategy to distribute different partitions to different directories.
> > As
> > > > currently, we are facing an imbalance in sizes of the aforementioned
> > > > directories, some directories have a lot of empty space whereas
> others
> > > are
> > > > getting filled quickly.
> > > >
> > > >
> > > > Regards
> > > >
> > >
> >
>


回复: Balancing traffic between multiple directories

2022-10-27 Thread gaode
Auto rebalancing is a very important feature to run Kafka in a production 
environment. Given the confluent already have this feature, are there any space 
that the open source version could have this feature as well?
Or, is it the idea that opensource version shouldn't be used in a high load 
production environment?


发件人: sunil chaudhari 
发送时间: 2022年10月27日 3:11
收件人: users@kafka.apache.org 
主题: Re: Balancing traffic between multiple directories

Hi Lehar,
You are right. There is no better way in open source Kafka.
However confluent has something called as Auto Rebalancing feature.
Can you check if there is free version with this feature?

It start balancing of  brokers automatically when it see there is uneven
distribution of partitions.

Regards,
Sunil.
On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain 
wrote:

> Hey Andrew,
>
> Thanks for the reply. Currently, we are using the same method as you
> described. Wanted to make sure if there is a better way.
>
> It seems there isn't currently. So we will keep using this only.
>
> On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant 
> wrote:
>
> > Hey Lehar,
> >
> >
> > I don’t think there’s a way to control this during topic creation. I just
> > took a look through
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala
> > and it does appear partition assignment does not account for each
> broker’s
> > different log directories. I also took a look at the kafka-topics.sh
> script
> > and it has a --replica-assignment argument but that looks to only allow
> > specifying brokers. During topic creation, once a replica has been
> chosen I
> > think we then choose the directory with the fewest number of partitions -
> > see
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192
> >
> >
> > What I think you can do is move existing partitions around with the
> > kafka-reassign-partitions.sh script. From running the command locally:
> >
> >
> > --reassignment-json-file  >
> >   manual assignment json file path>   reassignment configurationThe
> > format
> >
> >   to use is -
> >
> > {"partitions":
> >
> > [{"topic": "foo",
> >
> >   "partition": 1,
> >
> >   "replicas": [1,2,3],
> >
> >   "log_dirs":
> > ["dir1","dir2","dir3"]
> >
> >   }],
> >
> > "version":1
> >
> > }
> >
> > Note that "log_dirs" is optional.
> > When
> >
> >   it is specified, its length
> must
> >
> >   equal the length of the
> replicas
> >
> >   list. The value in this list
> can
> > be
> >
> >   either "any" or the absolution
> > path
> >
> >   of the log directory on the
> > broker.
> >
> >   If absolute log directory path
> is
> >
> >   specified, the replica will be
> > moved
> >
> >   to the specified log directory
> on
> >
> >   the broker.
> >
> >
> > There’s the log_dirs field you can use in the JSON file to move
> partitions
> > between directories.
> >
> >
> > Hope that helps a bit.
> >
> >
> > Andrew
> >
> > On Tue, Oct 25, 2022 at 6:56 AM Lehar Jain 
> > wrote:
> >
> > > Hey,
> > >
> > > We run Kafka brokers with multiple log directories. I wanted to know
> how
> > > Kafka balances traffic between various directories. Can we have our own
> > > strategy to distribute different partitions to different directories.
> As
> > > currently, we are facing an imbalance in sizes of the aforementioned
> > > directories, some directories have a lot of empty space whereas others
> > are
> > > getting filled quickly.
> > >
> > >
> > > Regards
> > >
> >
>