RE: Kafka Streams - Producer attempted to produce with an old epoch.
Detail I forgot to mention is that I am using EOS, so the streams application was consistently getting this error causing it to restart that task, which obviously would bottle neck everything. Once I increased the threads on the brokers the epoch error subsided until the volume increased. Right now the streams application has 36 threads per box (5 boxes w/ 48 threads) and when running normally it is keeping up, but once this epoch error starts it causes a cascade of slowness. The CPU is not even being fully utilized due to this error happening every minute or so. Perhaps some other tuning is needed too, but I'm lost what to look at. I do have JMX connection to the broker and streams applications if there's any useful information I should be looking at. -Original Message- From: Sophie Blee-Goldman Sent: Thursday, October 27, 2022 11:22 PM To: users@kafka.apache.org Subject: Re: Kafka Streams - Producer attempted to produce with an old epoch. CAUTION: This email originated from outside of ShopHQ. Do not click links or open attachments unless you recognize the sender and know the content is safe! I'm not one of the real experts on the Producer and even further from one with broker performance, so someone else may need to chime in for that, but I did have a few questions: What specifically are you unsatisfied with w.r.t the performance? Are you hoping for a higher throughput of your Streams app's output, or is there something about the brokers? I'm curious why you started with increasing the broker threads, especially if the perf issue/bottleneck is with the Streams app's processing (but maybe it is not). I would imagine that throwing more and more threads at the machine could even make things worse, it definitely will if the thread count gets high enough though it's hard to say where/when it might start to decline. Point is, if the brokers are eating up all the cpu time with their own threads then the clients embedded in Streams may be getting starved out at times, causing that StreamThread/consumer to drop out of the group and resulting in the producer getting fenced. Or it could be blocking i/o for rocksdb and leading to write stalls, which could similarly get that StreamThread kicked from the consumer group (if the application has state, especially if quite a lot). How many StreamThreads did you give the app btw? On Thu, Oct 27, 2022 at 8:01 PM Andrew Muraco wrote: > Hi, > I have a kafka streams application deployed on 5 nodes and with full > traffic I am getting the error message: > > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > Written offsets would not be recorded and no more records would be > sent since the producer is fenced, indicating the task may be migrated > out > > I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines > are the kafka brokers with 2x1TB disks for kafka logs and also running > the kafka Streams application. > 2x replication factor on topic, topic is producing about 250k per second. > I have 2 aggregations in the topology to 2 output topics, the final > output topics are in the 10s of k range per second. > > I'm assuming I have a bottleneck somewhere, I increased the broker > thread counts and observed that this frequency of this error reduced, > but it's still happening. > Here's the broker configuration I'm using now, but I might be > overshooting some of these values. > > num.network.threads=48 > num.io.threads=48 > socket.send.buffer.bytes=512000 > socket.receive.buffer.bytes=512000 > replica.socket.receive.buffer.bytes=1024000 > socket.request.max.bytes=10485760 > num.replica.fetchers=48 > log.cleaner.threads=48 > queued.max.requests=48000 > > I can't find good documentation on the effect of broker configuration > on performance. >
Re: Kafka Streams - Producer attempted to produce with an old epoch.
I'm not one of the real experts on the Producer and even further from one with broker performance, so someone else may need to chime in for that, but I did have a few questions: What specifically are you unsatisfied with w.r.t the performance? Are you hoping for a higher throughput of your Streams app's output, or is there something about the brokers? I'm curious why you started with increasing the broker threads, especially if the perf issue/bottleneck is with the Streams app's processing (but maybe it is not). I would imagine that throwing more and more threads at the machine could even make things worse, it definitely will if the thread count gets high enough though it's hard to say where/when it might start to decline. Point is, if the brokers are eating up all the cpu time with their own threads then the clients embedded in Streams may be getting starved out at times, causing that StreamThread/consumer to drop out of the group and resulting in the producer getting fenced. Or it could be blocking i/o for rocksdb and leading to write stalls, which could similarly get that StreamThread kicked from the consumer group (if the application has state, especially if quite a lot). How many StreamThreads did you give the app btw? On Thu, Oct 27, 2022 at 8:01 PM Andrew Muraco wrote: > Hi, > I have a kafka streams application deployed on 5 nodes and with full > traffic I am getting the error message: > > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > Written offsets would not be recorded and no more records would be sent > since the producer is fenced, indicating the task may be migrated out > > I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines are > the kafka brokers with 2x1TB disks for kafka logs and also running the > kafka Streams application. > 2x replication factor on topic, topic is producing about 250k per second. > I have 2 aggregations in the topology to 2 output topics, the final output > topics are in the 10s of k range per second. > > I'm assuming I have a bottleneck somewhere, I increased the broker thread > counts and observed that this frequency of this error reduced, but it's > still happening. > Here's the broker configuration I'm using now, but I might be overshooting > some of these values. > > num.network.threads=48 > num.io.threads=48 > socket.send.buffer.bytes=512000 > socket.receive.buffer.bytes=512000 > replica.socket.receive.buffer.bytes=1024000 > socket.request.max.bytes=10485760 > num.replica.fetchers=48 > log.cleaner.threads=48 > queued.max.requests=48000 > > I can't find good documentation on the effect of broker configuration on > performance. >
Kafka Streams - Producer attempted to produce with an old epoch.
Hi, I have a kafka streams application deployed on 5 nodes and with full traffic I am getting the error message: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out I have 5 x 24 CPU/48 core machines with 128gb of ram. These machines are the kafka brokers with 2x1TB disks for kafka logs and also running the kafka Streams application. 2x replication factor on topic, topic is producing about 250k per second. I have 2 aggregations in the topology to 2 output topics, the final output topics are in the 10s of k range per second. I'm assuming I have a bottleneck somewhere, I increased the broker thread counts and observed that this frequency of this error reduced, but it's still happening. Here's the broker configuration I'm using now, but I might be overshooting some of these values. num.network.threads=48 num.io.threads=48 socket.send.buffer.bytes=512000 socket.receive.buffer.bytes=512000 replica.socket.receive.buffer.bytes=1024000 socket.request.max.bytes=10485760 num.replica.fetchers=48 log.cleaner.threads=48 queued.max.requests=48000 I can't find good documentation on the effect of broker configuration on performance.
Topic Compaction
Hi All, We are using AWS MSK with kafka version 2.6.1. There is a compacted topic with the below configurations. After reading the documentation my understanding was that null values in the topic can be removed using delete retention time but I can see months old keys having null values. Is there any other configuration that needs to be changed for removing null values from a compacted topic? Thanks! cleanup.policy=compact segment.bytes=1073741824 min.cleanable.dirty.ratio=0.1 delete.retention.ms=360 segment.ms=360 Regards, Navneeth
AWS MSK Rack Awareness
Hi All, Has anyone implemented rack awareness with AWS MSK? In the broker configuration I see the following values (use1-az2, use1-az4 & use1-az6). How are these values determined and how can we inject these values to java consumers? Thanks! Regards, Navneeth
Re: Balancing traffic between multiple directories
I don't think the Confluent self-balancing feature works if you have your broker data in multiple directories anyway - it's expecting a single dir per broker and will try and keep the data balanced between brokers. Also just as an aside, I'm not sure there's much value in using multiple directories. I assume you have these mapped to individual disks? I'd be curious to hear if you actually get any performance benefit out of that, especially when weighed against the increased likelihood of disk failure. I realize that doesn't help your current problem, more of a question/discussion point I guess. I think your only option for moving the data around is the kafka-reassign-partitions.sh script. Alex C On Thu, Oct 27, 2022 at 9:30 AM Andrew Grant wrote: > There's Cruise Control, https://github.com/linkedin/cruise-control, which > is open-source and could help with automated balancing. > > On Thu, Oct 27, 2022 at 10:26 AM wrote: > > > Auto rebalancing is a very important feature to run Kafka in a production > > environment. Given the confluent already have this feature, are there any > > space that the open source version could have this feature as well? > > Or, is it the idea that opensource version shouldn't be used in a high > > load production environment? > > > > > > 发件人: sunil chaudhari > > 发送时间: 2022年10月27日 3:11 > > 收件人: users@kafka.apache.org > > 主题: Re: Balancing traffic between multiple directories > > > > Hi Lehar, > > You are right. There is no better way in open source Kafka. > > However confluent has something called as Auto Rebalancing feature. > > Can you check if there is free version with this feature? > > > > It start balancing of brokers automatically when it see there is uneven > > distribution of partitions. > > > > Regards, > > Sunil. > > On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain > > wrote: > > > > > Hey Andrew, > > > > > > Thanks for the reply. Currently, we are using the same method as you > > > described. Wanted to make sure if there is a better way. > > > > > > It seems there isn't currently. So we will keep using this only. > > > > > > On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant > > > > > > wrote: > > > > > > > Hey Lehar, > > > > > > > > > > > > I don’t think there’s a way to control this during topic creation. I > > just > > > > took a look through > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala > > > > and it does appear partition assignment does not account for each > > > broker’s > > > > different log directories. I also took a look at the kafka-topics.sh > > > script > > > > and it has a --replica-assignment argument but that looks to only > allow > > > > specifying brokers. During topic creation, once a replica has been > > > chosen I > > > > think we then choose the directory with the fewest number of > > partitions - > > > > see > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192 > > > > > > > > > > > > What I think you can do is move existing partitions around with the > > > > kafka-reassign-partitions.sh script. From running the command > locally: > > > > > > > > > > > > --reassignment-json-file > partition > > > > > > > > manual assignment json file path> reassignment > configurationThe > > > > format > > > > > > > > to use is - > > > > > > > > {"partitions": > > > > > > > > [{"topic": "foo", > > > > > > > > "partition": 1, > > > > > > > > "replicas": [1,2,3], > > > > > > > > "log_dirs": > > > > ["dir1","dir2","dir3"] > > > > > > > > }], > > > > > > > > "version":1 > > > > > > > > } > > > > > > > > Note that "log_dirs" is > > optional. > > > > When > > > > > > > > it is specified, its length > > > must > > > > > > > > equal the length of the > > > replicas > > > > > > > > list. The value in this > list > > > can > > > > be > > > > > > > > either "any" or the > > absolution > > > > path > > > > > > > > of the log directory on the > > > > broker. > > > > > > > > If absolute log directory > > path > > > is > > > > > > > > specified, the replica will > > be > > > > moved > > > > > > > > to the specified log > > directory > > > on > > > > > > > >
Re: Balancing traffic between multiple directories
There's Cruise Control, https://github.com/linkedin/cruise-control, which is open-source and could help with automated balancing. On Thu, Oct 27, 2022 at 10:26 AM wrote: > Auto rebalancing is a very important feature to run Kafka in a production > environment. Given the confluent already have this feature, are there any > space that the open source version could have this feature as well? > Or, is it the idea that opensource version shouldn't be used in a high > load production environment? > > > 发件人: sunil chaudhari > 发送时间: 2022年10月27日 3:11 > 收件人: users@kafka.apache.org > 主题: Re: Balancing traffic between multiple directories > > Hi Lehar, > You are right. There is no better way in open source Kafka. > However confluent has something called as Auto Rebalancing feature. > Can you check if there is free version with this feature? > > It start balancing of brokers automatically when it see there is uneven > distribution of partitions. > > Regards, > Sunil. > On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain > wrote: > > > Hey Andrew, > > > > Thanks for the reply. Currently, we are using the same method as you > > described. Wanted to make sure if there is a better way. > > > > It seems there isn't currently. So we will keep using this only. > > > > On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant > > > wrote: > > > > > Hey Lehar, > > > > > > > > > I don’t think there’s a way to control this during topic creation. I > just > > > took a look through > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala > > > and it does appear partition assignment does not account for each > > broker’s > > > different log directories. I also took a look at the kafka-topics.sh > > script > > > and it has a --replica-assignment argument but that looks to only allow > > > specifying brokers. During topic creation, once a replica has been > > chosen I > > > think we then choose the directory with the fewest number of > partitions - > > > see > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192 > > > > > > > > > What I think you can do is move existing partitions around with the > > > kafka-reassign-partitions.sh script. From running the command locally: > > > > > > > > > --reassignment-json-file partition > > > > > > manual assignment json file path> reassignment configurationThe > > > format > > > > > > to use is - > > > > > > {"partitions": > > > > > > [{"topic": "foo", > > > > > > "partition": 1, > > > > > > "replicas": [1,2,3], > > > > > > "log_dirs": > > > ["dir1","dir2","dir3"] > > > > > > }], > > > > > > "version":1 > > > > > > } > > > > > > Note that "log_dirs" is > optional. > > > When > > > > > > it is specified, its length > > must > > > > > > equal the length of the > > replicas > > > > > > list. The value in this list > > can > > > be > > > > > > either "any" or the > absolution > > > path > > > > > > of the log directory on the > > > broker. > > > > > > If absolute log directory > path > > is > > > > > > specified, the replica will > be > > > moved > > > > > > to the specified log > directory > > on > > > > > > the broker. > > > > > > > > > There’s the log_dirs field you can use in the JSON file to move > > partitions > > > between directories. > > > > > > > > > Hope that helps a bit. > > > > > > > > > Andrew > > > > > > On Tue, Oct 25, 2022 at 6:56 AM Lehar Jain > > > wrote: > > > > > > > Hey, > > > > > > > > We run Kafka brokers with multiple log directories. I wanted to know > > how > > > > Kafka balances traffic between various directories. Can we have our > own > > > > strategy to distribute different partitions to different directories. > > As > > > > currently, we are facing an imbalance in sizes of the aforementioned > > > > directories, some directories have a lot of empty space whereas > others > > > are > > > > getting filled quickly. > > > > > > > > > > > > Regards > > > > > > > > > >
回复: Balancing traffic between multiple directories
Auto rebalancing is a very important feature to run Kafka in a production environment. Given the confluent already have this feature, are there any space that the open source version could have this feature as well? Or, is it the idea that opensource version shouldn't be used in a high load production environment? 发件人: sunil chaudhari 发送时间: 2022年10月27日 3:11 收件人: users@kafka.apache.org 主题: Re: Balancing traffic between multiple directories Hi Lehar, You are right. There is no better way in open source Kafka. However confluent has something called as Auto Rebalancing feature. Can you check if there is free version with this feature? It start balancing of brokers automatically when it see there is uneven distribution of partitions. Regards, Sunil. On Wed, 26 Oct 2022 at 12:03 PM, Lehar Jain wrote: > Hey Andrew, > > Thanks for the reply. Currently, we are using the same method as you > described. Wanted to make sure if there is a better way. > > It seems there isn't currently. So we will keep using this only. > > On Tue, Oct 25, 2022 at 7:23 PM Andrew Grant > wrote: > > > Hey Lehar, > > > > > > I don’t think there’s a way to control this during topic creation. I just > > took a look through > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala > > and it does appear partition assignment does not account for each > broker’s > > different log directories. I also took a look at the kafka-topics.sh > script > > and it has a --replica-assignment argument but that looks to only allow > > specifying brokers. During topic creation, once a replica has been > chosen I > > think we then choose the directory with the fewest number of partitions - > > see > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1192 > > > > > > What I think you can do is move existing partitions around with the > > kafka-reassign-partitions.sh script. From running the command locally: > > > > > > --reassignment-json-file > > > manual assignment json file path> reassignment configurationThe > > format > > > > to use is - > > > > {"partitions": > > > > [{"topic": "foo", > > > > "partition": 1, > > > > "replicas": [1,2,3], > > > > "log_dirs": > > ["dir1","dir2","dir3"] > > > > }], > > > > "version":1 > > > > } > > > > Note that "log_dirs" is optional. > > When > > > > it is specified, its length > must > > > > equal the length of the > replicas > > > > list. The value in this list > can > > be > > > > either "any" or the absolution > > path > > > > of the log directory on the > > broker. > > > > If absolute log directory path > is > > > > specified, the replica will be > > moved > > > > to the specified log directory > on > > > > the broker. > > > > > > There’s the log_dirs field you can use in the JSON file to move > partitions > > between directories. > > > > > > Hope that helps a bit. > > > > > > Andrew > > > > On Tue, Oct 25, 2022 at 6:56 AM Lehar Jain > > wrote: > > > > > Hey, > > > > > > We run Kafka brokers with multiple log directories. I wanted to know > how > > > Kafka balances traffic between various directories. Can we have our own > > > strategy to distribute different partitions to different directories. > As > > > currently, we are facing an imbalance in sizes of the aforementioned > > > directories, some directories have a lot of empty space whereas others > > are > > > getting filled quickly. > > > > > > > > > Regards > > > > > >