Does Kafka batch when using broker-side compression?

2019-01-15 Thread Sven Ludwig
Hi,

does a Kafka broker batch messages together in order to improve compression 
when using broker-side compression only?

If so, what are the prerequisites to get that behavior? Perhaps a batching of 
messages at a producer might be beneficial or even mandatory in order to get 
batches in the log?

In my scenario, there is no compression on the producer, and it is also not 
planned to introduce it.

Cheers
Sven


Aw: Re: Re: Doubts in Kafka

2019-01-14 Thread Sven Ludwig
One more question:

Is there a way to ask Kafka which ProducerRecord.key is mapped to which 
TopicPartition (for debugging purposes etc.)?



Gesendet: Montag, 14. Januar 2019 um 13:49 Uhr
Von: "Sven Ludwig" 
An: users@kafka.apache.org
Betreff: Aw: Re: Re: Doubts in Kafka
Hi,

>> ...AND deactivate the key-based log-cleaner on the
>> broker so that it does not delete older records
>> that have the same key?

> How old records are cleaned is independent of what you do with
> processed records. You usually retain them for enough time so
> you don't loose them before processing them
> + some safety time...

Yes, I got that.

My wording was not sharp enough. I realized now that what I really meant here 
was log compaction. But log compaction would only ever get activated if one 
would set cleanup.policy=compact for a topic or perhaps as a default for 
topics. So I do not have to be worried about log compaction when giving each 
ProducerRecord the device UUID as key, as long as it does not get activated.

Glad to see that the approach seems valid.

Thank you and Cheers!
Sven


Gesendet: Freitag, 11. Januar 2019 um 12:43 Uhr
Von: "Peter Levart" 
An: users@kafka.apache.org, "Sven Ludwig" 
Betreff: Re: Aw: Re: Doubts in Kafka

On 1/10/19 2:26 PM, Sven Ludwig wrote:
> Okay, but
>
> what if one also needs to preserve the order of messages coming from a 
> particular device?
>
> With Kafka, this is perhaps possible if all messages from a particular device 
> go into the same partition.
>
> Would it be a good and efficient solution for this approach to set the key of 
> each Kafka ProducerRecord to the unique ID of the Device

Exactly!

> AND deactivate the key-based log-cleaner on the broker so that it does not 
> delete older records that have the same key?

How old records are cleaned is independent of what you do with processed
records. You usually retain them for enough time so you don't loose them
before processing them + some safety time...

Regards, Peter

>
> Sven
>
>
> Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr
> Von: "Peter Levart" 
> An: users@kafka.apache.org, "aruna ramachandran" 
> Betreff: Re: Doubts in Kafka
> Hi Aruna,
>
> On 1/10/19 8:19 AM, aruna ramachandran wrote:
>> I am using keyed partitions with 1000 partitions, so I need to create 1000
>> consumers because consumers groups and re balancing concepts is not worked
>> in the case of manually assigned consumers.Is there any replacement for the
>> above problem.
>>
> What API are you using in the KafkaConsumer? Are you using
> subscribe(Collection topics) or are you using
> assign(Collection partitions) ?
>
> The 1st one (subscribe) is the one you should be using for your usecase.
> With that call, when you subscribe to a multi-partition topic and you
> have multiple KafkaConsumer(s) configured with the same consumer group
> id, then partitions of the topic are dynamically assigned (and possibly
> reassigned when consumers come or go) to a set of live consumers. Will
> this work for you (and why not)?
>
> Regards, Peter
 


Aw: Re: Re: Doubts in Kafka

2019-01-14 Thread Sven Ludwig
Hi,
 
>> ...AND deactivate the key-based log-cleaner on the
>> broker so that it does not delete older records
>> that have the same key?

> How old records are cleaned is independent of what you do with
> processed records. You usually retain them for enough time so
> you don't loose them before processing them
> + some safety time...

Yes, I got that.

My wording was not sharp enough. I realized now that what I really meant here 
was log compaction. But log compaction would only ever get activated if one 
would set cleanup.policy=compact for a topic or perhaps as a default for 
topics. So I do not have to be worried about log compaction when giving each 
ProducerRecord the device UUID as key, as long as it does not get activated.

Glad to see that the approach seems valid.

Thank you and Cheers!
Sven
 

Gesendet: Freitag, 11. Januar 2019 um 12:43 Uhr
Von: "Peter Levart" 
An: users@kafka.apache.org, "Sven Ludwig" 
Betreff: Re: Aw: Re: Doubts in Kafka

On 1/10/19 2:26 PM, Sven Ludwig wrote:
> Okay, but
>
> what if one also needs to preserve the order of messages coming from a 
> particular device?
>
> With Kafka, this is perhaps possible if all messages from a particular device 
> go into the same partition.
>
> Would it be a good and efficient solution for this approach to set the key of 
> each Kafka ProducerRecord to the unique ID of the Device

Exactly!

> AND deactivate the key-based log-cleaner on the broker so that it does not 
> delete older records that have the same key?

How old records are cleaned is independent of what you do with processed
records. You usually retain them for enough time so you don't loose them
before processing them + some safety time...

Regards, Peter

>
> Sven
>
>
> Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr
> Von: "Peter Levart" 
> An: users@kafka.apache.org, "aruna ramachandran" 
> Betreff: Re: Doubts in Kafka
> Hi Aruna,
>
> On 1/10/19 8:19 AM, aruna ramachandran wrote:
>> I am using keyed partitions with 1000 partitions, so I need to create 1000
>> consumers because consumers groups and re balancing concepts is not worked
>> in the case of manually assigned consumers.Is there any replacement for the
>> above problem.
>>
> What API are you using in the KafkaConsumer? Are you using
> subscribe(Collection topics) or are you using
> assign(Collection partitions) ?
>
> The 1st one (subscribe) is the one you should be using for your usecase.
> With that call, when you subscribe to a multi-partition topic and you
> have multiple KafkaConsumer(s) configured with the same consumer group
> id, then partitions of the topic are dynamically assigned (and possibly
> reassigned when consumers come or go) to a set of live consumers. Will
> this work for you (and why not)?
>
> Regards, Peter
 


Aw: Re: Doubts in Kafka

2019-01-10 Thread Sven Ludwig
Okay, but
 
what if one also needs to preserve the order of messages coming from a 
particular device?

With Kafka, this is perhaps possible if all messages from a particular device 
go into the same partition.

Would it be a good and efficient solution for this approach to set the key of 
each Kafka ProducerRecord to the unique ID of the Device
AND deactivate the key-based log-cleaner on the broker so that it does not 
delete older records that have the same key?

Sven


Gesendet: Donnerstag, 10. Januar 2019 um 08:35 Uhr
Von: "Peter Levart" 
An: users@kafka.apache.org, "aruna ramachandran" 
Betreff: Re: Doubts in Kafka
Hi Aruna,

On 1/10/19 8:19 AM, aruna ramachandran wrote:
> I am using keyed partitions with 1000 partitions, so I need to create 1000
> consumers because consumers groups and re balancing concepts is not worked
> in the case of manually assigned consumers.Is there any replacement for the
> above problem.
>

What API are you using in the KafkaConsumer? Are you using
subscribe(Collection topics) or are you using
assign(Collection partitions) ?

The 1st one (subscribe) is the one you should be using for your usecase.
With that call, when you subscribe to a multi-partition topic and you
have multiple KafkaConsumer(s) configured with the same consumer group
id, then partitions of the topic are dynamically assigned (and possibly
reassigned when consumers come or go) to a set of live consumers. Will
this work for you (and why not)?

Regards, Peter


Aw: Migration Path from 0.8

2018-06-18 Thread Sven Ludwig
Hi Martin,

even though with Kafka I as a fellow user never found it problematic to just 
give upgrades a shot no matter the version increase in a test environment, 
since you are still on 0.8 you should thoroughly study the details at this link 
and depending on whether you care about one or more of them, or not, make a 
little plan on how to upgrade: https://kafka.apache.org/documentation/#upgrade

Good thing is this upgrading chapter covers all versions back to including 0.8

It has some tips here and there you may want to consider, e.g. upgrading broker 
first before upgrading clients.

Cheers
Sven
 

Gesendet: Montag, 18. Juni 2018 um 22:38 Uhr
Von: "Martín Fernández" 
An: users@kafka.apache.org
Betreff: Migration Path from 0.8
Hello,

First time writing in the mailing list!

At this moment we are running 3 Kafka (0.8) broker cluster. We want to try to 
upgrade our cluster to Kafka 1.0. 

Is there any recommendation for migrating from 0.8 ? Would it be possible to 
move directly from 0.8 to 1.0 or we should go through all the intermediate 
major releases ? 

Our client consumers and producers are ruby or jruby apps.

Thanks before hand!   

Best,
Martín


How to determine Start and End Offsets of a Kafka Topic properly

2018-06-08 Thread Sven Ludwig
Hi,
 
I have the case very often that I need to find out the Start and End Offset of 
a Kafka Topic.
 
Usually I go to the server to the place where the Topic contents (segments) are 
stored and look at the file names. This I do this way to avoid using Console 
Consumers since they make use of Offset Topics I believe, adding unnecessary 
management overhead to the cluster. However, our operations team has now 
blocked developer-access to the directories that contain the files.
 
What are good alternative ways to obtain the information (Start and End Offset 
of a Kafka Topic), ideally without using Console Consumers?
 
Kind Regards
Sven





Aw: Re: Problem to apply Broker-side lz4 compression even in fresh setup

2017-12-29 Thread Sven Ludwig
I checked the latest .log files and the earliest. They are all the same with 
human-readable message payload.


I tried setting LZ4 but that leads to a fatal on startup:

[2017-12-29 13:55:15,393] FATAL [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.config.ConfigException: Invalid value LZ4 for 
configuration compression.type: String must be one of: uncompressed, snappy, 
lz4, gzip, producer
at 
org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:897)
at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:469)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
at kafka.log.LogConfig.(LogConfig.scala:68)
at kafka.log.LogManager$.apply(LogManager.scala:783)
at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
at 
io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)
at 
io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)

So I switched back to lz4 immediately. So no solution so far.


If Broker-side compression is applied correctly, does it then recompress 
regarding new incoming messages/batches no matter what compression the producer 
applies? Or are the producer settings somehow relevant even then?


Cheers,
Sven



Gesendet: Freitag, 29. Dezember 2017 um 14:45 Uhr
Von: Manikumar 
An: users@kafka.apache.org
Betreff: Re: Problem to apply Broker-side lz4 compression even in fresh setup
Is this config added after sending some data? Can you verify the latest
logs?
This wont recompress existing messages. Only applicable to new messages.

On Fri, Dec 29, 2017 at 6:59 PM, Ted Yu  wrote:

> Looking at https://issues.apache.org/jira/browse/KAFKA-5686 , it seems you
> should have specified LZ4.
>
> FYI
>
> On Fri, Dec 29, 2017 at 5:00 AM, Sven Ludwig  wrote:
>
> > Hi,
> >
> > we thought we have lz4 applied as broker-side compression on our Kafka
> > Cluster for storing measurements, but today I looked into the individual
> > .log files and I was able to read all the measurements in plain text by
> > just using less on the command line. This is for me an indicator that
> > batches of messages are actually not compressed with lz4. Our setup was
> > started from scratch i.e. without pre-existing topics, there are no
> > topic-level overrides and it is based on Confluent Platform 4.0.0
> >
> >
> > 1. Is it perhaps so that we need to care in every Producer that is does
> > not already compress batches when sending them to the Broker? Up to
> today I
> > thought that if the Producer compresses, but the Broker has
> > compression.type lz4, that the Broker would recompress as lz4?
> >
> >
> > 2. When starting the Broker, in its log-statements it shows the line:
> > compression.type = lz4
> > Is this correct, or does the value need to be 'lz4' with apostrophe?
> >
> >
> > 3. Any other hints or possibilities what could be wrong?
> >
> >
> > Generally we would like to enforce lz4 broker-side compression. We do not
> > need to compress data coming from producers, since the network link is
> not
> > the problem. We just need to save on disk space.
> >
> > Please help us if you can, and have a good new years eve :-)
> >
> > Thanks,
> > Sven
> >
>


Problem to apply Broker-side lz4 compression even in fresh setup

2017-12-29 Thread Sven Ludwig
Hi,
 
we thought we have lz4 applied as broker-side compression on our Kafka Cluster 
for storing measurements, but today I looked into the individual .log files and 
I was able to read all the measurements in plain text by just using less on the 
command line. This is for me an indicator that batches of messages are actually 
not compressed with lz4. Our setup was started from scratch i.e. without 
pre-existing topics, there are no topic-level overrides and it is based on 
Confluent Platform 4.0.0
 
 
1. Is it perhaps so that we need to care in every Producer that is does not 
already compress batches when sending them to the Broker? Up to today I thought 
that if the Producer compresses, but the Broker has compression.type lz4, that 
the Broker would recompress as lz4?
 
 
2. When starting the Broker, in its log-statements it shows the line:
compression.type = lz4
Is this correct, or does the value need to be 'lz4' with apostrophe?
 
 
3. Any other hints or possibilities what could be wrong?
 
 
Generally we would like to enforce lz4 broker-side compression. We do not need 
to compress data coming from producers, since the network link is not the 
problem. We just need to save on disk space.
 
Please help us if you can, and have a good new years eve :-)
 
Thanks,
Sven


Fw: Number of Processes Kafka Server spawns

2017-12-27 Thread Sven Ludwig
Answering myself:

Kafka Server spawns only 1 process.

htop by default also shows threads, which was misleading. It can be turned off 
by pressing H

https://unix.stackexchange.com/questions/10362/why-does-htop-show-more-process-than-ps

Regards,
Sven
 

Gesendet: Mittwoch, 27. Dezember 2017 um 16:17 Uhr
Von: "Sven Ludwig" 
An: users@kafka.apache.org
Betreff: Number of Processes Kafka Server spawns
Hi,

does Kafka ever launch processes in addition to the Kafka server process?

I have a setup with CentOS plus Docker plus Kafka Server container based on the 
image of Confluent Platform 4.0.0

In ps -Alf there is only one Kafka process, but in htop I can see many.

Regards,
Sven


Number of Processes Kafka Server spawns

2017-12-27 Thread Sven Ludwig
Hi,

does Kafka ever launch processes in addition to the Kafka server process?

I have a setup with CentOS plus Docker plus Kafka Server container based on the 
image of Confluent Platform 4.0.0

In ps -Alf there is only one Kafka process, but in htop I can see many.

Regards,
Sven


Number of Topics and Partitions in recent Kafka?

2017-10-13 Thread Sven Ludwig
Hello,

with recent Kafka, would it be okay to have about 1000 topics, with between 
1000 to 3000 partitions each, on a 6-node Kafka cluster with replication factor 
3?

Each partition would be written to by one producer and consumed by one 
consumer, with about 2 messages per minute coming in.

Would Kafka and its management overhead be okay given the large number of 
partitions in the millions?

Kind Regards,
Sven



How to clear a particular partition?

2017-08-10 Thread Sven Ludwig
Hello,
 
assume that all producers and consumers regarding a topic-partition have been 
shutdown.
 
Is it possible in this situation to empty that topic-partition, while the other 
topic-partitions keep working?
 
Like for example, is it possible to trigger a log truncation to 0 on the leader 
for that partition using some admin tool?
 
Kind Regards,
Sven
 


Aw: Need help regarding Compression

2017-07-12 Thread Sven Ludwig
Hi,

received a first answer today in 
https://issues.apache.org/jira/browse/KAFKA-1499 from Manikumar.

So it looks like a topic with mixed compression type can be resulting in the 
described scenario.

I wrote a follow-up question in the ticket whether it could be prevented by 
post-configuring a topic to give it a topic-level override for compression.type

Kind Regards
Sven
 

Gesendet: Dienstag, 11. Juli 2017 um 11:41 Uhr
Von: "Sven Ludwig" 
An: users@kafka.apache.org
Betreff: Need help regarding Compression
Hi,

I need help. We have a Kafka cluster in production with topics compressed with 
snappy. We want to configure compression.type lz4 on the brokers and restart 
them (either rolling update, or all down all up if this has benefits).


What happens to the existing topics compressed with snappy after 
compression.type has changed to lz4 on a leader / on a follower?


Regarding the documentation, nothing should happen to existing topics, because 
the topic configuration should not change just because that default on a broker 
changed. But I want to be more sure. My concerns behind this are: Is there any 
danger of getting topics that are partially in snappy and partially in lz4, and 
if so, would that be a problem? Is there moreover a danger of records/segments 
being replicated to other brokers in lz4 while older records/segments on these 
brokers are in snappy, and if so, would that be a problem?

Kind Regards,
Sven


Need help regarding Compression

2017-07-11 Thread Sven Ludwig
Hi,

I need help. We have a Kafka cluster in production with topics compressed with 
snappy. We want to configure compression.type lz4 on the brokers and restart 
them (either rolling update, or all down all up if this has benefits).


What happens to the existing topics compressed with snappy after 
compression.type has changed to lz4 on a leader / on a follower?


Regarding the documentation, nothing should happen to existing topics, because 
the topic configuration should not change just because that default on a broker 
changed. But I want to be more sure. My concerns behind this are: Is there any 
danger of getting topics that are partially in snappy and partially in lz4, and 
if so, would that be a problem? Is there moreover a danger of records/segments 
being replicated to other brokers in lz4 while older records/segments on these 
brokers are in snappy, and if so, would that be a problem?

Kind Regards,
Sven


Aw: Re: Broker-side Compression and Migration to lz4

2017-07-07 Thread Sven Ludwig
Hi,

thanks. On another aspect in this: On a cluster that has topics with data, is 
it safe to gracefully shutdown a broker, change the configured compression from 
snappy to lz4 and start the broker? What will haben if one does that? All 
afterwards created new topics will come with lz4 compression. But what happens 
to the existing topics in snappy?

Regards
Sven
 

Gesendet: Montag, 26. Juni 2017 um 13:10 Uhr
Von: "Ismael Juma" 
An: "Kafka Users" 
Betreff: Re: Broker-side Compression and Migration to lz4
Hi Sven,

If you change the topic config, any new data received by that broker will
be in the new compression type. However, followers don't uncompress data,
so they will store the data as it was in the leader. An easier way to test
what you are trying to test is to use MirrorMaker to mirror the data to
another cluster.

Ismael

On Mon, Jun 26, 2017 at 11:59 AM, Sven Ludwig  wrote:

> Hi,
>
>
> 1. I would like to test lz4 compression on a new broker node that I want
> add to my cluster, while the other nodes remain in snappy, in order to
> compare disk usage etc. I am not sure if this scenario is even possible
> with only one cluster, since in the docs it is mentioned that
> compression.type is a topic-level setting, and thus maybe not really per
> broker. I want to use Broker-side compression only. The situation is:
>
> broker 1 - Status: Up - compression.type snappy
> broker 2 - Status: Up - compression.type snappy
> broker 3 - Status: Down - compression.type lz4
>
> The topic for the test already exists on brokers 1 and 2 which are up and
> has some 20GB content.
>
> When I start broker 3, will it use snappy or lz4 for the topic?
>
>
> 2. How can one migrate a cluster (or a topic) from one broker-side
> compression type to another?
>
>
> Kind Regards,
> Sven
>


Broker-side Compression and Migration to lz4

2017-06-26 Thread Sven Ludwig
Hi,
 

1. I would like to test lz4 compression on a new broker node that I want add to 
my cluster, while the other nodes remain in snappy, in order to compare disk 
usage etc. I am not sure if this scenario is even possible with only one 
cluster, since in the docs it is mentioned that compression.type is a 
topic-level setting, and thus maybe not really per broker. I want to use 
Broker-side compression only. The situation is:
 
broker 1 - Status: Up - compression.type snappy
broker 2 - Status: Up - compression.type snappy
broker 3 - Status: Down - compression.type lz4

The topic for the test already exists on brokers 1 and 2 which are up and has 
some 20GB content.
 
When I start broker 3, will it use snappy or lz4 for the topic?

 
2. How can one migrate a cluster (or a topic) from one broker-side compression 
type to another?

 
Kind Regards,
Sven


Aw: Re: NotLeaderForPartitionException

2016-12-16 Thread Sven Ludwig
Hi,

thank you for the nice clarification.

This is also described indirectly here, which I had not found before: 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Partitioningandbootstrapping

The Log-Level of this could perhaps be considered to be reduced to WARN, if it 
is quite common and usually recovered.

Kind Regards
Sven


Gesendet: Dienstag, 06. Dezember 2016 um 21:47 Uhr
Von: "Apurva Mehta" 
An: users@kafka.apache.org
Betreff: Re: NotLeaderForPartitionException
Hi Sven,

You will see this exception during leader election. When the leader for a
partition moves to another broker, there is a period during which the
replicas would still connect to the original leader, at which point they
will raise this exception. This should be a very short period, after which
they will connect to and replicate from the new leader correctly.

This is not a fatal error, and you will see it if you are bouncing brokers
(since all the leaders on that broker will have to move after the bounce).
You may also see it if some brokers have connectivity issues: they may be
considered dead, and their partitions would be moved elsewhere.

Hope this helps,
Apurva

On Tue, Dec 6, 2016 at 10:06 AM, Sven Ludwig  wrote:

> Hello,
>
> in our Kafka clusters we sometimes observe a specific ERROR log-statement,
> and therefore we have doubts whether it is already running sable in our
> configuration. This occurs every now and then, like two or three times in a
> day. It is actually the predominant ERROR log-statement in our cluster.
> Example:
>
> [2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for
> partition [,] to broker
> 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is
> not the leader for that topic-partition. (kafka.server.
> ReplicaFetcherThread)
>
> We already asked Google, but we did not find sufficient answers to our
> questions, therefore I am asking on the mailing list:
>
> 1. What are the possible reasons for this particular error?
>
> 2. What are the implications of it?
>
> 3. What can be done to prevent it?
>
> Best Regards,
> Sven
>


RE: ActiveControllerCount is always will be either 0 or 1 in 3 nodes kafka cluster?

2016-12-12 Thread Sven Ludwig
Hi,
 
in JMX each Kafka broker has a value 1 or 0 for ActiveControllerCount. As I 
understood from this thread, the sum of these values across the cluster should 
never be something other than 1. The documentation at 
http://docs.confluent.io/3.1.0/kafka/monitoring.html should be improved to make 
that clear. Currently it is misleading:

kafka.controller:type=KafkaController,name=ActiveControllerCount
Number of active controllers in the cluster. Alert if value is anything other 
than 1.

Suggested:

kafka.controller:type=KafkaController,name=ActiveControllerCount
Number of active controllers on a broker. Alert if the aggregated sum across 
all brokers in the cluster is anything other than 1, because in a cluster there 
should only be one broker with an active controller (cluster singleton).

Kind Regards,
Sven


NotLeaderForPartitionException

2016-12-06 Thread Sven Ludwig
Hello,
 
in our Kafka clusters we sometimes observe a specific ERROR log-statement, and 
therefore we have doubts whether it is already running sable in our 
configuration. This occurs every now and then, like two or three times in a 
day. It is actually the predominant ERROR log-statement in our cluster. Example:
 
[2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for partition 
[,] to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
 
We already asked Google, but we did not find sufficient answers to our 
questions, therefore I am asking on the mailing list:
 
1. What are the possible reasons for this particular error?
 
2. What are the implications of it?
 
3. What can be done to prevent it?
 
Best Regards,
Sven