Does Kafka batch when using broker-side compression?
Hi, does a Kafka broker batch messages together in order to improve compression when using broker-side compression only? If so, what are the prerequisites to get that behavior? Perhaps a batching of messages at a producer might be beneficial or even mandatory in order to get batches in the log? In my scenario, there is no compression on the producer, and it is also not planned to introduce it. Cheers Sven
Aw: Re: Re: Doubts in Kafka
One more question: Is there a way to ask Kafka which ProducerRecord.key is mapped to which TopicPartition (for debugging purposes etc.)? Gesendet: Montag, 14. Januar 2019 um 13:49 Uhr Von: "Sven Ludwig" An: users@kafka.apache.org Betreff: Aw: Re: Re: Doubts in Kafka Hi, >> ...AND deactivate the key-based log-cleaner on the >> broker so that it does not delete older records >> that have the same key? > How old records are cleaned is independent of what you do with > processed records. You usually retain them for enough time so > you don't loose them before processing them > + some safety time... Yes, I got that. My wording was not sharp enough. I realized now that what I really meant here was log compaction. But log compaction would only ever get activated if one would set cleanup.policy=compact for a topic or perhaps as a default for topics. So I do not have to be worried about log compaction when giving each ProducerRecord the device UUID as key, as long as it does not get activated. Glad to see that the approach seems valid. Thank you and Cheers! Sven Gesendet: Freitag, 11. Januar 2019 um 12:43 Uhr Von: "Peter Levart" An: users@kafka.apache.org, "Sven Ludwig" Betreff: Re: Aw: Re: Doubts in Kafka On 1/10/19 2:26 PM, Sven Ludwig wrote: > Okay, but > > what if one also needs to preserve the order of messages coming from a > particular device? > > With Kafka, this is perhaps possible if all messages from a particular device > go into the same partition. > > Would it be a good and efficient solution for this approach to set the key of > each Kafka ProducerRecord to the unique ID of the Device Exactly! > AND deactivate the key-based log-cleaner on the broker so that it does not > delete older records that have the same key? How old records are cleaned is independent of what you do with processed records. You usually retain them for enough time so you don't loose them before processing them + some safety time... Regards, Peter > > Sven > > > Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr > Von: "Peter Levart" > An: users@kafka.apache.org, "aruna ramachandran" > Betreff: Re: Doubts in Kafka > Hi Aruna, > > On 1/10/19 8:19 AM, aruna ramachandran wrote: >> I am using keyed partitions with 1000 partitions, so I need to create 1000 >> consumers because consumers groups and re balancing concepts is not worked >> in the case of manually assigned consumers.Is there any replacement for the >> above problem. >> > What API are you using in the KafkaConsumer? Are you using > subscribe(Collection topics) or are you using > assign(Collection partitions) ? > > The 1st one (subscribe) is the one you should be using for your usecase. > With that call, when you subscribe to a multi-partition topic and you > have multiple KafkaConsumer(s) configured with the same consumer group > id, then partitions of the topic are dynamically assigned (and possibly > reassigned when consumers come or go) to a set of live consumers. Will > this work for you (and why not)? > > Regards, Peter
Aw: Re: Re: Doubts in Kafka
Hi, >> ...AND deactivate the key-based log-cleaner on the >> broker so that it does not delete older records >> that have the same key? > How old records are cleaned is independent of what you do with > processed records. You usually retain them for enough time so > you don't loose them before processing them > + some safety time... Yes, I got that. My wording was not sharp enough. I realized now that what I really meant here was log compaction. But log compaction would only ever get activated if one would set cleanup.policy=compact for a topic or perhaps as a default for topics. So I do not have to be worried about log compaction when giving each ProducerRecord the device UUID as key, as long as it does not get activated. Glad to see that the approach seems valid. Thank you and Cheers! Sven Gesendet: Freitag, 11. Januar 2019 um 12:43 Uhr Von: "Peter Levart" An: users@kafka.apache.org, "Sven Ludwig" Betreff: Re: Aw: Re: Doubts in Kafka On 1/10/19 2:26 PM, Sven Ludwig wrote: > Okay, but > > what if one also needs to preserve the order of messages coming from a > particular device? > > With Kafka, this is perhaps possible if all messages from a particular device > go into the same partition. > > Would it be a good and efficient solution for this approach to set the key of > each Kafka ProducerRecord to the unique ID of the Device Exactly! > AND deactivate the key-based log-cleaner on the broker so that it does not > delete older records that have the same key? How old records are cleaned is independent of what you do with processed records. You usually retain them for enough time so you don't loose them before processing them + some safety time... Regards, Peter > > Sven > > > Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr > Von: "Peter Levart" > An: users@kafka.apache.org, "aruna ramachandran" > Betreff: Re: Doubts in Kafka > Hi Aruna, > > On 1/10/19 8:19 AM, aruna ramachandran wrote: >> I am using keyed partitions with 1000 partitions, so I need to create 1000 >> consumers because consumers groups and re balancing concepts is not worked >> in the case of manually assigned consumers.Is there any replacement for the >> above problem. >> > What API are you using in the KafkaConsumer? Are you using > subscribe(Collection topics) or are you using > assign(Collection partitions) ? > > The 1st one (subscribe) is the one you should be using for your usecase. > With that call, when you subscribe to a multi-partition topic and you > have multiple KafkaConsumer(s) configured with the same consumer group > id, then partitions of the topic are dynamically assigned (and possibly > reassigned when consumers come or go) to a set of live consumers. Will > this work for you (and why not)? > > Regards, Peter
Aw: Re: Doubts in Kafka
Okay, but what if one also needs to preserve the order of messages coming from a particular device? With Kafka, this is perhaps possible if all messages from a particular device go into the same partition. Would it be a good and efficient solution for this approach to set the key of each Kafka ProducerRecord to the unique ID of the Device AND deactivate the key-based log-cleaner on the broker so that it does not delete older records that have the same key? Sven Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr Von: "Peter Levart" An: users@kafka.apache.org, "aruna ramachandran" Betreff: Re: Doubts in Kafka Hi Aruna, On 1/10/19 8:19 AM, aruna ramachandran wrote: > I am using keyed partitions with 1000 partitions, so I need to create 1000 > consumers because consumers groups and re balancing concepts is not worked > in the case of manually assigned consumers.Is there any replacement for the > above problem. > What API are you using in the KafkaConsumer? Are you using subscribe(Collection topics) or are you using assign(Collection partitions) ? The 1st one (subscribe) is the one you should be using for your usecase. With that call, when you subscribe to a multi-partition topic and you have multiple KafkaConsumer(s) configured with the same consumer group id, then partitions of the topic are dynamically assigned (and possibly reassigned when consumers come or go) to a set of live consumers. Will this work for you (and why not)? Regards, Peter
Aw: Migration Path from 0.8
Hi Martin, even though with Kafka I as a fellow user never found it problematic to just give upgrades a shot no matter the version increase in a test environment, since you are still on 0.8 you should thoroughly study the details at this link and depending on whether you care about one or more of them, or not, make a little plan on how to upgrade: https://kafka.apache.org/documentation/#upgrade Good thing is this upgrading chapter covers all versions back to including 0.8 It has some tips here and there you may want to consider, e.g. upgrading broker first before upgrading clients. Cheers Sven Gesendet: Montag, 18. Juni 2018 um 22:38 Uhr Von: "Martín Fernández" An: users@kafka.apache.org Betreff: Migration Path from 0.8 Hello, First time writing in the mailing list! At this moment we are running 3 Kafka (0.8) broker cluster. We want to try to upgrade our cluster to Kafka 1.0. Is there any recommendation for migrating from 0.8 ? Would it be possible to move directly from 0.8 to 1.0 or we should go through all the intermediate major releases ? Our client consumers and producers are ruby or jruby apps. Thanks before hand! Best, Martín
How to determine Start and End Offsets of a Kafka Topic properly
Hi, I have the case very often that I need to find out the Start and End Offset of a Kafka Topic. Usually I go to the server to the place where the Topic contents (segments) are stored and look at the file names. This I do this way to avoid using Console Consumers since they make use of Offset Topics I believe, adding unnecessary management overhead to the cluster. However, our operations team has now blocked developer-access to the directories that contain the files. What are good alternative ways to obtain the information (Start and End Offset of a Kafka Topic), ideally without using Console Consumers? Kind Regards Sven
Aw: Re: Problem to apply Broker-side lz4 compression even in fresh setup
I checked the latest .log files and the earliest. They are all the same with human-readable message payload. I tried setting LZ4 but that leads to a fatal on startup: [2017-12-29 13:55:15,393] FATAL [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.config.ConfigException: Invalid value LZ4 for configuration compression.type: String must be one of: uncompressed, snappy, lz4, gzip, producer at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:897) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:469) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at kafka.log.LogConfig.(LogConfig.scala:68) at kafka.log.LogManager$.apply(LogManager.scala:783) at kafka.server.KafkaServer.startup(KafkaServer.scala:222) at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112) at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58) So I switched back to lz4 immediately. So no solution so far. If Broker-side compression is applied correctly, does it then recompress regarding new incoming messages/batches no matter what compression the producer applies? Or are the producer settings somehow relevant even then? Cheers, Sven Gesendet: Freitag, 29. Dezember 2017 um 14:45 Uhr Von: Manikumar An: users@kafka.apache.org Betreff: Re: Problem to apply Broker-side lz4 compression even in fresh setup Is this config added after sending some data? Can you verify the latest logs? This wont recompress existing messages. Only applicable to new messages. On Fri, Dec 29, 2017 at 6:59 PM, Ted Yu wrote: > Looking at https://issues.apache.org/jira/browse/KAFKA-5686 , it seems you > should have specified LZ4. > > FYI > > On Fri, Dec 29, 2017 at 5:00 AM, Sven Ludwig wrote: > > > Hi, > > > > we thought we have lz4 applied as broker-side compression on our Kafka > > Cluster for storing measurements, but today I looked into the individual > > .log files and I was able to read all the measurements in plain text by > > just using less on the command line. This is for me an indicator that > > batches of messages are actually not compressed with lz4. Our setup was > > started from scratch i.e. without pre-existing topics, there are no > > topic-level overrides and it is based on Confluent Platform 4.0.0 > > > > > > 1. Is it perhaps so that we need to care in every Producer that is does > > not already compress batches when sending them to the Broker? Up to > today I > > thought that if the Producer compresses, but the Broker has > > compression.type lz4, that the Broker would recompress as lz4? > > > > > > 2. When starting the Broker, in its log-statements it shows the line: > > compression.type = lz4 > > Is this correct, or does the value need to be 'lz4' with apostrophe? > > > > > > 3. Any other hints or possibilities what could be wrong? > > > > > > Generally we would like to enforce lz4 broker-side compression. We do not > > need to compress data coming from producers, since the network link is > not > > the problem. We just need to save on disk space. > > > > Please help us if you can, and have a good new years eve :-) > > > > Thanks, > > Sven > > >
Problem to apply Broker-side lz4 compression even in fresh setup
Hi, we thought we have lz4 applied as broker-side compression on our Kafka Cluster for storing measurements, but today I looked into the individual .log files and I was able to read all the measurements in plain text by just using less on the command line. This is for me an indicator that batches of messages are actually not compressed with lz4. Our setup was started from scratch i.e. without pre-existing topics, there are no topic-level overrides and it is based on Confluent Platform 4.0.0 1. Is it perhaps so that we need to care in every Producer that is does not already compress batches when sending them to the Broker? Up to today I thought that if the Producer compresses, but the Broker has compression.type lz4, that the Broker would recompress as lz4? 2. When starting the Broker, in its log-statements it shows the line: compression.type = lz4 Is this correct, or does the value need to be 'lz4' with apostrophe? 3. Any other hints or possibilities what could be wrong? Generally we would like to enforce lz4 broker-side compression. We do not need to compress data coming from producers, since the network link is not the problem. We just need to save on disk space. Please help us if you can, and have a good new years eve :-) Thanks, Sven
Fw: Number of Processes Kafka Server spawns
Answering myself: Kafka Server spawns only 1 process. htop by default also shows threads, which was misleading. It can be turned off by pressing H https://unix.stackexchange.com/questions/10362/why-does-htop-show-more-process-than-ps Regards, Sven Gesendet: Mittwoch, 27. Dezember 2017 um 16:17 Uhr Von: "Sven Ludwig" An: users@kafka.apache.org Betreff: Number of Processes Kafka Server spawns Hi, does Kafka ever launch processes in addition to the Kafka server process? I have a setup with CentOS plus Docker plus Kafka Server container based on the image of Confluent Platform 4.0.0 In ps -Alf there is only one Kafka process, but in htop I can see many. Regards, Sven
Number of Processes Kafka Server spawns
Hi, does Kafka ever launch processes in addition to the Kafka server process? I have a setup with CentOS plus Docker plus Kafka Server container based on the image of Confluent Platform 4.0.0 In ps -Alf there is only one Kafka process, but in htop I can see many. Regards, Sven
Number of Topics and Partitions in recent Kafka?
Hello, with recent Kafka, would it be okay to have about 1000 topics, with between 1000 to 3000 partitions each, on a 6-node Kafka cluster with replication factor 3? Each partition would be written to by one producer and consumed by one consumer, with about 2 messages per minute coming in. Would Kafka and its management overhead be okay given the large number of partitions in the millions? Kind Regards, Sven
How to clear a particular partition?
Hello, assume that all producers and consumers regarding a topic-partition have been shutdown. Is it possible in this situation to empty that topic-partition, while the other topic-partitions keep working? Like for example, is it possible to trigger a log truncation to 0 on the leader for that partition using some admin tool? Kind Regards, Sven
Aw: Need help regarding Compression
Hi, received a first answer today in https://issues.apache.org/jira/browse/KAFKA-1499 from Manikumar. So it looks like a topic with mixed compression type can be resulting in the described scenario. I wrote a follow-up question in the ticket whether it could be prevented by post-configuring a topic to give it a topic-level override for compression.type Kind Regards Sven Gesendet: Dienstag, 11. Juli 2017 um 11:41 Uhr Von: "Sven Ludwig" An: users@kafka.apache.org Betreff: Need help regarding Compression Hi, I need help. We have a Kafka cluster in production with topics compressed with snappy. We want to configure compression.type lz4 on the brokers and restart them (either rolling update, or all down all up if this has benefits). What happens to the existing topics compressed with snappy after compression.type has changed to lz4 on a leader / on a follower? Regarding the documentation, nothing should happen to existing topics, because the topic configuration should not change just because that default on a broker changed. But I want to be more sure. My concerns behind this are: Is there any danger of getting topics that are partially in snappy and partially in lz4, and if so, would that be a problem? Is there moreover a danger of records/segments being replicated to other brokers in lz4 while older records/segments on these brokers are in snappy, and if so, would that be a problem? Kind Regards, Sven
Need help regarding Compression
Hi, I need help. We have a Kafka cluster in production with topics compressed with snappy. We want to configure compression.type lz4 on the brokers and restart them (either rolling update, or all down all up if this has benefits). What happens to the existing topics compressed with snappy after compression.type has changed to lz4 on a leader / on a follower? Regarding the documentation, nothing should happen to existing topics, because the topic configuration should not change just because that default on a broker changed. But I want to be more sure. My concerns behind this are: Is there any danger of getting topics that are partially in snappy and partially in lz4, and if so, would that be a problem? Is there moreover a danger of records/segments being replicated to other brokers in lz4 while older records/segments on these brokers are in snappy, and if so, would that be a problem? Kind Regards, Sven
Aw: Re: Broker-side Compression and Migration to lz4
Hi, thanks. On another aspect in this: On a cluster that has topics with data, is it safe to gracefully shutdown a broker, change the configured compression from snappy to lz4 and start the broker? What will haben if one does that? All afterwards created new topics will come with lz4 compression. But what happens to the existing topics in snappy? Regards Sven Gesendet: Montag, 26. Juni 2017 um 13:10 Uhr Von: "Ismael Juma" An: "Kafka Users" Betreff: Re: Broker-side Compression and Migration to lz4 Hi Sven, If you change the topic config, any new data received by that broker will be in the new compression type. However, followers don't uncompress data, so they will store the data as it was in the leader. An easier way to test what you are trying to test is to use MirrorMaker to mirror the data to another cluster. Ismael On Mon, Jun 26, 2017 at 11:59 AM, Sven Ludwig wrote: > Hi, > > > 1. I would like to test lz4 compression on a new broker node that I want > add to my cluster, while the other nodes remain in snappy, in order to > compare disk usage etc. I am not sure if this scenario is even possible > with only one cluster, since in the docs it is mentioned that > compression.type is a topic-level setting, and thus maybe not really per > broker. I want to use Broker-side compression only. The situation is: > > broker 1 - Status: Up - compression.type snappy > broker 2 - Status: Up - compression.type snappy > broker 3 - Status: Down - compression.type lz4 > > The topic for the test already exists on brokers 1 and 2 which are up and > has some 20GB content. > > When I start broker 3, will it use snappy or lz4 for the topic? > > > 2. How can one migrate a cluster (or a topic) from one broker-side > compression type to another? > > > Kind Regards, > Sven >
Broker-side Compression and Migration to lz4
Hi, 1. I would like to test lz4 compression on a new broker node that I want add to my cluster, while the other nodes remain in snappy, in order to compare disk usage etc. I am not sure if this scenario is even possible with only one cluster, since in the docs it is mentioned that compression.type is a topic-level setting, and thus maybe not really per broker. I want to use Broker-side compression only. The situation is: broker 1 - Status: Up - compression.type snappy broker 2 - Status: Up - compression.type snappy broker 3 - Status: Down - compression.type lz4 The topic for the test already exists on brokers 1 and 2 which are up and has some 20GB content. When I start broker 3, will it use snappy or lz4 for the topic? 2. How can one migrate a cluster (or a topic) from one broker-side compression type to another? Kind Regards, Sven
Aw: Re: NotLeaderForPartitionException
Hi, thank you for the nice clarification. This is also described indirectly here, which I had not found before: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Partitioningandbootstrapping The Log-Level of this could perhaps be considered to be reduced to WARN, if it is quite common and usually recovered. Kind Regards Sven Gesendet: Dienstag, 06. Dezember 2016 um 21:47 Uhr Von: "Apurva Mehta" An: users@kafka.apache.org Betreff: Re: NotLeaderForPartitionException Hi Sven, You will see this exception during leader election. When the leader for a partition moves to another broker, there is a period during which the replicas would still connect to the original leader, at which point they will raise this exception. This should be a very short period, after which they will connect to and replicate from the new leader correctly. This is not a fatal error, and you will see it if you are bouncing brokers (since all the leaders on that broker will have to move after the bounce). You may also see it if some brokers have connectivity issues: they may be considered dead, and their partitions would be moved elsewhere. Hope this helps, Apurva On Tue, Dec 6, 2016 at 10:06 AM, Sven Ludwig wrote: > Hello, > > in our Kafka clusters we sometimes observe a specific ERROR log-statement, > and therefore we have doubts whether it is already running sable in our > configuration. This occurs every now and then, like two or three times in a > day. It is actually the predominant ERROR log-statement in our cluster. > Example: > > [2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for > partition [,] to broker > 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is > not the leader for that topic-partition. (kafka.server. > ReplicaFetcherThread) > > We already asked Google, but we did not find sufficient answers to our > questions, therefore I am asking on the mailing list: > > 1. What are the possible reasons for this particular error? > > 2. What are the implications of it? > > 3. What can be done to prevent it? > > Best Regards, > Sven >
RE: ActiveControllerCount is always will be either 0 or 1 in 3 nodes kafka cluster?
Hi, in JMX each Kafka broker has a value 1 or 0 for ActiveControllerCount. As I understood from this thread, the sum of these values across the cluster should never be something other than 1. The documentation at http://docs.confluent.io/3.1.0/kafka/monitoring.html should be improved to make that clear. Currently it is misleading: kafka.controller:type=KafkaController,name=ActiveControllerCount Number of active controllers in the cluster. Alert if value is anything other than 1. Suggested: kafka.controller:type=KafkaController,name=ActiveControllerCount Number of active controllers on a broker. Alert if the aggregated sum across all brokers in the cluster is anything other than 1, because in a cluster there should only be one broker with an active controller (cluster singleton). Kind Regards, Sven
NotLeaderForPartitionException
Hello, in our Kafka clusters we sometimes observe a specific ERROR log-statement, and therefore we have doubts whether it is already running sable in our configuration. This occurs every now and then, like two or three times in a day. It is actually the predominant ERROR log-statement in our cluster. Example: [2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for partition [,] to broker 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) We already asked Google, but we did not find sufficient answers to our questions, therefore I am asking on the mailing list: 1. What are the possible reasons for this particular error? 2. What are the implications of it? 3. What can be done to prevent it? Best Regards, Sven