Re: log compaction across partitions

2021-01-05 Thread Shaohan Yin
Hi,

Normally the same keys will lead to the same partition.
And log compaction works on a single topic-partition.

Cheers

On Wed, 6 Jan 2021 at 07:40, Jacob Botuck  wrote:

> If I somehow have 2 records with the same key in different partitions in a
> topic with log compaction enabled, will log compaction remove the old
> record? Or do the records need to be in the same partition for log
> compaction to work?
>


log compaction across partitions

2021-01-05 Thread Jacob Botuck
If I somehow have 2 records with the same key in different partitions in a
topic with log compaction enabled, will log compaction remove the old
record? Or do the records need to be in the same partition for log
compaction to work?


Re: kafka log compaction

2020-06-18 Thread Ricardo Ferreira

Pushkar,

"1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?" -- Yes. This is the purpose of this 
setting.


"2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?" -- They don't play any role in compaction, they play a role 
in retention. Compaction has more to do with the behavior of keeping the 
very last mutation of a record -- whereas retention dictates how long 
the data needs to be retained. They can be used interchangeably.


Thanks

-- Ricardo

On 6/18/20 12:10 AM, Pushkar Deole wrote:

Hi All

I want some of my topics to retain data forever without any deletion since
those topics hold static data that is always required by application. Also,
for these topic I want to retain latest value for key.
I believe the cleanup policy of 'compact' would meet my needs. I have
following questions though:
1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?
2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?



kafka log compaction

2020-06-17 Thread Pushkar Deole
Hi All

I want some of my topics to retain data forever without any deletion since
those topics hold static data that is always required by application. Also,
for these topic I want to retain latest value for key.
I believe the cleanup policy of 'compact' would meet my needs. I have
following questions though:
1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?
2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?


Re: AW: Configuration of log compaction

2019-04-29 Thread Ethan Stein
I received the same error as well.  Were any of you able to fix this issue?

On 2018/12/18 09:42:58, Claudia Wegmann  wrote: 
> Hi Liam,> 
> 
> 
> 
> thanks for the pointer. I found out, that the log cleaner on all kafka 
> brokers died with the following error message:> 
> 
> 
> 
> [2018-12-04 15:33:24,886] INFO Cleaner 0: Caught segment overflow error 
> during cleaning: Detected offset overflow at offset -1 in segment 
> LogSegment(baseOffset=3605669, size=6970326) (kafka.log.LogCleaner)> 
> 
> [2018-12-04 15:33:24,958] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)> 
> 
> java.lang.IllegalArgumentException: requirement failed: Split operation is 
> only permitted for segments with overflow> 
> 
> at scala.Predef$.require(Predef.scala:277)> 
> 
> at kafka.log.Log.splitOverflowedSegment(Log.scala:1873)> 
> 
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:517)> 
> 
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:465)> 
> 
> at 
> kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:464)> 
> 
> at scala.collection.immutable.List.foreach(List.scala:389)> 
> 
> at kafka.log.Cleaner.doClean(LogCleaner.scala:464)> 
> 
> at kafka.log.Cleaner.clean(LogCleaner.scala:442)> 
> 
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:303)> 
> 
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)> 
> 
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)> 
> 
> [2018-12-04 15:33:24,959] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)> 
> 
> 
> 
> This brought up some new questions:> 
> 
> 1.) How do I avoid this error? Did I misconfigure the creation of segments?> 
> 
> 2.) Is there a way to monitor the log cleaner?> 
> 
> 3.) Can I restart the log cleaner manually without restarting the whole 
> broker?> 
> 
> 
> 
> Thanks for your help.> 
> 
> 
> 
> Best,> 
> 
> Claudia> 
> 
> 
> 
> -Ursprüngliche Nachricht-> 
> 
> Von: Liam Clarke  > 
> 
> Gesendet: Montag, 17. Dezember 2018 23:06> 
> 
> An: users@kafka.apache.org> 
> 
> Betreff: Re: Configuration of log compaction> 
> 
> 
> 
> Hi Claudia,> 
> 
> 
> 
> Anything useful in the log cleaner log files?> 
> 
> 
> 
> Cheers,> 
> 
> 
> 
> Liam Clarke> 
> 
> 
> 
> On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann  
> 
> 
> 
> > Hi,> 
> 
> >> 
> 
> > thanks for the quick response.> 
> 
> >> 
> 
> > My problem is not, that no new segments are created, but that segments > 
> 
> > with old data do not get compacted.> 
> 
> > I had to restart one broker because there was no diskspace left. After > 
> 
> > recreating all indexes etc. the broker recognized the old data and > 
> 
> > compacted it correctly. I had to restart all other brokers of the > 
> 
> > cluster, too, for them to also recognize the old data and start 
> > compacting.> 
> 
> >> 
> 
> > So I guess, before restarting the brokers where to busy to > 
> 
> > compact/delete old data? Is there a configuration to ensure compaction > 
> 
> > after a certain amount of time or something?> 
> 
> >> 
> 
> > Best,> 
> 
> > Claudia> 
> 
> >> 
> 
> > -Ursprüngliche Nachricht-> 
> 
> > Von: Spico Florin > 
> 
> > Gesendet: Montag, 17. Dezember 2018 14:28> 
> 
> > An: users@kafka.apache.org> 
> 
> > Betreff: Re: Configuration of log compaction> 
> 
> >> 
> 
> > Hello!> 
> 
> >   Please check whether the segment.ms configuration on topic will help > 
> 
> > you to solve your problem.> 
> 
> >> 
> 
> > https://kafka.apache.org/documentation/> 
> 
> >> 
> 
> >> 
> 
> > https://stackoverflow.com/questions/41048041/kafka-deletes-segments-ev> 
> 
> > en-before-segment-size-is-reached> 
> 
> >> 
> 
> > Regards,> 
> 
> >  Florin> 
> 
> >> 
> 
> > segment.ms This configuration controls the period of time after which > 
> 
> > Kafka will force the log to roll even if the segment file isn't full > 
> 
> > to ensure that retention can delete or compact old data. long > 
> 
> > 60480 [1,...] log.roll.ms medium> 
> 
> >> 

AW: Configuration of log compaction

2019-02-28 Thread Claudia Wegmann
Hi kafka users,

I still have the problem of the log cleaner dying regularly. Yesterday I 
restarted 3 of my brokers in a 5-broker cluster. On two of them the log cleaner 
threads already died again before they were able to clean up most of the old 
data.

A general question in this regard:
1.) What is the exact purpose of the log cleaner? Is it just responsible for 
cleaning up internal topics?
Eventhough the log cleaner shut down, the broker still logs lines such as
"[Log partition=resultGatewayDataDispatcher-11, dir=/data/kafka-logs] Found 
deletable segments with base offsets [42976352] due to retention time 
1440ms breach (kafka.log.Log)"
and then cleans the stated segments.
But internal topics (for state stores) or the consumer-offset topic do not get 
compacted/cleaned up.

2.) What can I do to stabilize the log cleaning? Is it easier for the log 
cleaner to clean up small segments but many? Up until now I use the default 
configurations for log segment size. Should I reduce it?

Anyone any other ideas?

Best,
Claudia


-Ursprüngliche Nachricht-
Von: Claudia Wegmann  
Gesendet: Mittwoch, 19. Dezember 2018 08:50
An: users@kafka.apache.org
Betreff: AW: Configuration of log compaction

Hi Liam,

My brokers are running in version 2.0.0 Restarting was not a problem, it just 
seemed "a lot" for just restarting the log cleaner. I will think about piping 
the logs of the log cleaner to elk. And I will definitely keep an eye on the 
logs to see if the error occurs again.

Thanks again!

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Liam Clarke 
Gesendet: Dienstag, 18. Dezember 2018 23:05
An: users@kafka.apache.org
Betreff: Re: Configuration of log compaction

Kia ora Claudia,

What version Kafka are you running? I can't find the specified code in Kafka 
1.0.0.

With regards to your questions:

1) I don't think so, but I'm just guessing here.
2) The log cleaner logs, if piped into something like Elasticsearch could be 
used to monitor it?
3) No, it's a thread started by the broker. I imagine you're using an old 
broker version, around 0.8.2 perhaps? We've found that rolling broker restarts 
with 0.11 are rather easy and not to be feared.

Kind regards,

Liam Clarke

On Tue, Dec 18, 2018 at 10:43 PM Claudia Wegmann 
wrote:

> Hi Liam,
>
> thanks for the pointer. I found out, that the log cleaner on all kafka 
> brokers died with the following error message:
>
> [2018-12-04 15:33:24,886] INFO Cleaner 0: Caught segment overflow 
> error during cleaning: Detected offset overflow at offset -1 in 
> segment LogSegment(baseOffset=3605669, size=6970326)
> (kafka.log.LogCleaner)
> [2018-12-04 15:33:24,958] ERROR [kafka-log-cleaner-thread-0]: Error 
> due to
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Split 
> operation is only permitted for segments with overflow
> at scala.Predef$.require(Predef.scala:277)
> at kafka.log.Log.splitOverflowedSegment(Log.scala:1873)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:517)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:465)
> at
> kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:464)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:464)
> at kafka.log.Cleaner.clean(LogCleaner.scala:442)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:303)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)
> at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-12-04 15:33:24,959] INFO [kafka-log-cleaner-thread-0]: Stopped
> (kafka.log.LogCleaner)
>
> This brought up some new questions:
> 1.) How do I avoid this error? Did I misconfigure the creation of segments?
> 2.) Is there a way to monitor the log cleaner?
> 3.) Can I restart the log cleaner manually without restarting the 
> whole broker?
>
> Thanks for your help.
>
> Best,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Liam Clarke 
> Gesendet: Montag, 17. Dezember 2018 23:06
> An: users@kafka.apache.org
> Betreff: Re: Configuration of log compaction
>
> Hi Claudia,
>
> Anything useful in the log cleaner log files?
>
> Cheers,
>
> Liam Clarke
>
> On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann 
> > Hi,
> >
> > thanks for the quick response.
> >
> > My problem is not, that no new segments are created, but that 
> > segments with old data do not get compacted.
> > I had to restart one broker because there was no diskspace left. 
> > After recreating all indexes etc. the broker recognized the old data 
> > and compacted it correctly. I had to restart all other brok

AW: Configuration of log compaction

2018-12-18 Thread Claudia Wegmann
Hi Liam,

My brokers are running in version 2.0.0 Restarting was not a problem, it just 
seemed "a lot" for just restarting the log cleaner. I will think about piping 
the logs of the log cleaner to elk. And I will definitely keep an eye on the 
logs to see if the error occurs again.

Thanks again!

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Liam Clarke  
Gesendet: Dienstag, 18. Dezember 2018 23:05
An: users@kafka.apache.org
Betreff: Re: Configuration of log compaction

Kia ora Claudia,

What version Kafka are you running? I can't find the specified code in Kafka 
1.0.0.

With regards to your questions:

1) I don't think so, but I'm just guessing here.
2) The log cleaner logs, if piped into something like Elasticsearch could be 
used to monitor it?
3) No, it's a thread started by the broker. I imagine you're using an old 
broker version, around 0.8.2 perhaps? We've found that rolling broker restarts 
with 0.11 are rather easy and not to be feared.

Kind regards,

Liam Clarke

On Tue, Dec 18, 2018 at 10:43 PM Claudia Wegmann 
wrote:

> Hi Liam,
>
> thanks for the pointer. I found out, that the log cleaner on all kafka 
> brokers died with the following error message:
>
> [2018-12-04 15:33:24,886] INFO Cleaner 0: Caught segment overflow 
> error during cleaning: Detected offset overflow at offset -1 in 
> segment LogSegment(baseOffset=3605669, size=6970326) 
> (kafka.log.LogCleaner)
> [2018-12-04 15:33:24,958] ERROR [kafka-log-cleaner-thread-0]: Error 
> due to
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Split 
> operation is only permitted for segments with overflow
> at scala.Predef$.require(Predef.scala:277)
> at kafka.log.Log.splitOverflowedSegment(Log.scala:1873)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:517)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:465)
> at
> kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:464)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:464)
> at kafka.log.Cleaner.clean(LogCleaner.scala:442)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:303)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)
> at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-12-04 15:33:24,959] INFO [kafka-log-cleaner-thread-0]: Stopped
> (kafka.log.LogCleaner)
>
> This brought up some new questions:
> 1.) How do I avoid this error? Did I misconfigure the creation of segments?
> 2.) Is there a way to monitor the log cleaner?
> 3.) Can I restart the log cleaner manually without restarting the 
> whole broker?
>
> Thanks for your help.
>
> Best,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Liam Clarke 
> Gesendet: Montag, 17. Dezember 2018 23:06
> An: users@kafka.apache.org
> Betreff: Re: Configuration of log compaction
>
> Hi Claudia,
>
> Anything useful in the log cleaner log files?
>
> Cheers,
>
> Liam Clarke
>
> On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann 
> > Hi,
> >
> > thanks for the quick response.
> >
> > My problem is not, that no new segments are created, but that 
> > segments with old data do not get compacted.
> > I had to restart one broker because there was no diskspace left. 
> > After recreating all indexes etc. the broker recognized the old data 
> > and compacted it correctly. I had to restart all other brokers of 
> > the cluster, too, for them to also recognize the old data and start
> compacting.
> >
> > So I guess, before restarting the brokers where to busy to 
> > compact/delete old data? Is there a configuration to ensure 
> > compaction after a certain amount of time or something?
> >
> > Best,
> > Claudia
> >
> > -Ursprüngliche Nachricht-
> > Von: Spico Florin 
> > Gesendet: Montag, 17. Dezember 2018 14:28
> > An: users@kafka.apache.org
> > Betreff: Re: Configuration of log compaction
> >
> > Hello!
> >   Please check whether the segment.ms configuration on topic will 
> > help you to solve your problem.
> >
> > https://kafka.apache.org/documentation/
> >
> >
> > https://stackoverflow.com/questions/41048041/kafka-deletes-segments-
> > ev
> > en-before-segment-size-is-reached
> >
> > Regards,
> >  Florin
> >
> > segment.ms This configuration controls the period of time after 
> > which Kafka will force the log to roll even if the segment file 
> > isn't full to ensure that retention can delete o

Re: Configuration of log compaction

2018-12-18 Thread Liam Clarke
Kia ora Claudia,

What version Kafka are you running? I can't find the specified code in
Kafka 1.0.0.

With regards to your questions:

1) I don't think so, but I'm just guessing here.
2) The log cleaner logs, if piped into something like Elasticsearch could
be used to monitor it?
3) No, it's a thread started by the broker. I imagine you're using an old
broker version, around 0.8.2 perhaps? We've found that rolling broker
restarts with 0.11 are rather easy and not to be feared.

Kind regards,

Liam Clarke

On Tue, Dec 18, 2018 at 10:43 PM Claudia Wegmann 
wrote:

> Hi Liam,
>
> thanks for the pointer. I found out, that the log cleaner on all kafka
> brokers died with the following error message:
>
> [2018-12-04 15:33:24,886] INFO Cleaner 0: Caught segment overflow error
> during cleaning: Detected offset overflow at offset -1 in segment
> LogSegment(baseOffset=3605669, size=6970326) (kafka.log.LogCleaner)
> [2018-12-04 15:33:24,958] ERROR [kafka-log-cleaner-thread-0]: Error due to
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Split operation is
> only permitted for segments with overflow
> at scala.Predef$.require(Predef.scala:277)
> at kafka.log.Log.splitOverflowedSegment(Log.scala:1873)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:517)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:465)
> at
> kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:464)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:464)
> at kafka.log.Cleaner.clean(LogCleaner.scala:442)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:303)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-12-04 15:33:24,959] INFO [kafka-log-cleaner-thread-0]: Stopped
> (kafka.log.LogCleaner)
>
> This brought up some new questions:
> 1.) How do I avoid this error? Did I misconfigure the creation of segments?
> 2.) Is there a way to monitor the log cleaner?
> 3.) Can I restart the log cleaner manually without restarting the whole
> broker?
>
> Thanks for your help.
>
> Best,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Liam Clarke 
> Gesendet: Montag, 17. Dezember 2018 23:06
> An: users@kafka.apache.org
> Betreff: Re: Configuration of log compaction
>
> Hi Claudia,
>
> Anything useful in the log cleaner log files?
>
> Cheers,
>
> Liam Clarke
>
> On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann 
> > Hi,
> >
> > thanks for the quick response.
> >
> > My problem is not, that no new segments are created, but that segments
> > with old data do not get compacted.
> > I had to restart one broker because there was no diskspace left. After
> > recreating all indexes etc. the broker recognized the old data and
> > compacted it correctly. I had to restart all other brokers of the
> > cluster, too, for them to also recognize the old data and start
> compacting.
> >
> > So I guess, before restarting the brokers where to busy to
> > compact/delete old data? Is there a configuration to ensure compaction
> > after a certain amount of time or something?
> >
> > Best,
> > Claudia
> >
> > -Ursprüngliche Nachricht-
> > Von: Spico Florin 
> > Gesendet: Montag, 17. Dezember 2018 14:28
> > An: users@kafka.apache.org
> > Betreff: Re: Configuration of log compaction
> >
> > Hello!
> >   Please check whether the segment.ms configuration on topic will help
> > you to solve your problem.
> >
> > https://kafka.apache.org/documentation/
> >
> >
> > https://stackoverflow.com/questions/41048041/kafka-deletes-segments-ev
> > en-before-segment-size-is-reached
> >
> > Regards,
> >  Florin
> >
> > segment.ms This configuration controls the period of time after which
> > Kafka will force the log to roll even if the segment file isn't full
> > to ensure that retention can delete or compact old data. long
> > 60480 [1,...] log.roll.ms medium
> >
> > On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
> > wrote:
> >
> > > Dear kafka users,
> > >
> > > I've got a problem on one of my kafka clusters. I use this cluster
> > > with kafka streams applications. Some of this stream apps use a
> > > kafka state store. Therefore a changelog topic is created for those
> > > stores with cleanup policy "compact". One of these topics is running
> > > wild for some time now and seems to grow indefinitely. When I check
> > > the  log file of the first segment, there is a lot of data in it,
> > > that should have been compacted already.
> > >
> > > So I guess I did not configure everything correctly for log
> > > compaction to work as expected. What config parameters do have
> > > influence on log compaction? And how to set them, when I want data
> > > older than 4 hours to be compacted?
> > >
> > > Thanks in advance.
> > >
> > > Best,
> > > Claudia
> > >
> >
>


AW: Configuration of log compaction

2018-12-18 Thread Claudia Wegmann
Hi Liam,

thanks for the pointer. I found out, that the log cleaner on all kafka brokers 
died with the following error message:

[2018-12-04 15:33:24,886] INFO Cleaner 0: Caught segment overflow error during 
cleaning: Detected offset overflow at offset -1 in segment 
LogSegment(baseOffset=3605669, size=6970326) (kafka.log.LogCleaner)
[2018-12-04 15:33:24,958] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: Split operation is only 
permitted for segments with overflow
at scala.Predef$.require(Predef.scala:277)
at kafka.log.Log.splitOverflowedSegment(Log.scala:1873)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:517)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:465)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:464)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:464)
at kafka.log.Cleaner.clean(LogCleaner.scala:442)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:303)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-12-04 15:33:24,959] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)

This brought up some new questions:
1.) How do I avoid this error? Did I misconfigure the creation of segments?
2.) Is there a way to monitor the log cleaner?
3.) Can I restart the log cleaner manually without restarting the whole broker?

Thanks for your help.

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Liam Clarke  
Gesendet: Montag, 17. Dezember 2018 23:06
An: users@kafka.apache.org
Betreff: Re: Configuration of log compaction

Hi Claudia,

Anything useful in the log cleaner log files?

Cheers,

Liam Clarke

On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann  Hi,
>
> thanks for the quick response.
>
> My problem is not, that no new segments are created, but that segments 
> with old data do not get compacted.
> I had to restart one broker because there was no diskspace left. After 
> recreating all indexes etc. the broker recognized the old data and 
> compacted it correctly. I had to restart all other brokers of the 
> cluster, too, for them to also recognize the old data and start compacting.
>
> So I guess, before restarting the brokers where to busy to 
> compact/delete old data? Is there a configuration to ensure compaction 
> after a certain amount of time or something?
>
> Best,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Spico Florin 
> Gesendet: Montag, 17. Dezember 2018 14:28
> An: users@kafka.apache.org
> Betreff: Re: Configuration of log compaction
>
> Hello!
>   Please check whether the segment.ms configuration on topic will help 
> you to solve your problem.
>
> https://kafka.apache.org/documentation/
>
>
> https://stackoverflow.com/questions/41048041/kafka-deletes-segments-ev
> en-before-segment-size-is-reached
>
> Regards,
>  Florin
>
> segment.ms This configuration controls the period of time after which 
> Kafka will force the log to roll even if the segment file isn't full 
> to ensure that retention can delete or compact old data. long 
> 60480 [1,...] log.roll.ms medium
>
> On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
> wrote:
>
> > Dear kafka users,
> >
> > I've got a problem on one of my kafka clusters. I use this cluster 
> > with kafka streams applications. Some of this stream apps use a 
> > kafka state store. Therefore a changelog topic is created for those 
> > stores with cleanup policy "compact". One of these topics is running 
> > wild for some time now and seems to grow indefinitely. When I check 
> > the  log file of the first segment, there is a lot of data in it, 
> > that should have been compacted already.
> >
> > So I guess I did not configure everything correctly for log 
> > compaction to work as expected. What config parameters do have 
> > influence on log compaction? And how to set them, when I want data 
> > older than 4 hours to be compacted?
> >
> > Thanks in advance.
> >
> > Best,
> > Claudia
> >
>


Re: Configuration of log compaction

2018-12-17 Thread Liam Clarke
Hi Claudia,

Anything useful in the log cleaner log files?

Cheers,

Liam Clarke

On Tue, 18 Dec. 2018, 3:18 am Claudia Wegmann  Hi,
>
> thanks for the quick response.
>
> My problem is not, that no new segments are created, but that segments
> with old data do not get compacted.
> I had to restart one broker because there was no diskspace left. After
> recreating all indexes etc. the broker recognized the old data and
> compacted it correctly. I had to restart all other brokers of the cluster,
> too, for them to also recognize the old data and start compacting.
>
> So I guess, before restarting the brokers where to busy to compact/delete
> old data? Is there a configuration to ensure compaction after a certain
> amount of time or something?
>
> Best,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Spico Florin 
> Gesendet: Montag, 17. Dezember 2018 14:28
> An: users@kafka.apache.org
> Betreff: Re: Configuration of log compaction
>
> Hello!
>   Please check whether the segment.ms configuration on topic will help
> you to solve your problem.
>
> https://kafka.apache.org/documentation/
>
>
> https://stackoverflow.com/questions/41048041/kafka-deletes-segments-even-before-segment-size-is-reached
>
> Regards,
>  Florin
>
> segment.ms This configuration controls the period of time after which
> Kafka will force the log to roll even if the segment file isn't full to
> ensure that retention can delete or compact old data. long 60480
> [1,...] log.roll.ms medium
>
> On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
> wrote:
>
> > Dear kafka users,
> >
> > I've got a problem on one of my kafka clusters. I use this cluster
> > with kafka streams applications. Some of this stream apps use a kafka
> > state store. Therefore a changelog topic is created for those stores
> > with cleanup policy "compact". One of these topics is running wild for
> > some time now and seems to grow indefinitely. When I check the  log
> > file of the first segment, there is a lot of data in it, that should
> > have been compacted already.
> >
> > So I guess I did not configure everything correctly for log compaction
> > to work as expected. What config parameters do have influence on log
> > compaction? And how to set them, when I want data older than 4 hours
> > to be compacted?
> >
> > Thanks in advance.
> >
> > Best,
> > Claudia
> >
>


RE: Configuration of log compaction

2018-12-17 Thread Komal Babu
Hi,

Can you please remove me from this mail box.

Regards, 

Komal Babu
Sourcing Administrator
ko...@qbs.co.uk
020 8733 7139
http://www.qbsd.co.uk

QBS Distribution, 
7 Wharfside, 
Rosemont Road, 
Wembley, HA0 4QB
United Kingdom


Banner

-Original Message-
From: Claudia Wegmann  
Sent: 17 December 2018 14:18
To: users@kafka.apache.org
Subject: AW: Configuration of log compaction

Hi,

thanks for the quick response.

My problem is not, that no new segments are created, but that segments with old 
data do not get compacted.
I had to restart one broker because there was no diskspace left. After 
recreating all indexes etc. the broker recognized the old data and compacted it 
correctly. I had to restart all other brokers of the cluster, too, for them to 
also recognize the old data and start compacting.

So I guess, before restarting the brokers where to busy to compact/delete old 
data? Is there a configuration to ensure compaction after a certain amount of 
time or something?

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Spico Florin 
Gesendet: Montag, 17. Dezember 2018 14:28
An: users@kafka.apache.org
Betreff: Re: Configuration of log compaction

Hello!
  Please check whether the segment.ms configuration on topic will help you to 
solve your problem.

https://kafka.apache.org/documentation/

https://stackoverflow.com/questions/41048041/kafka-deletes-segments-even-before-segment-size-is-reached

Regards,
 Florin

segment.ms This configuration controls the period of time after which Kafka 
will force the log to roll even if the segment file isn't full to ensure that 
retention can delete or compact old data. long 60480 [1,...] log.roll.ms 
medium

On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
wrote:

> Dear kafka users,
>
> I've got a problem on one of my kafka clusters. I use this cluster 
> with kafka streams applications. Some of this stream apps use a kafka 
> state store. Therefore a changelog topic is created for those stores 
> with cleanup policy "compact". One of these topics is running wild for 
> some time now and seems to grow indefinitely. When I check the  log 
> file of the first segment, there is a lot of data in it, that should 
> have been compacted already.
>
> So I guess I did not configure everything correctly for log compaction 
> to work as expected. What config parameters do have influence on log 
> compaction? And how to set them, when I want data older than 4 hours 
> to be compacted?
>
> Thanks in advance.
>
> Best,
> Claudia
>


AW: Configuration of log compaction

2018-12-17 Thread Claudia Wegmann
Hi,

thanks for the quick response.

My problem is not, that no new segments are created, but that segments with old 
data do not get compacted.
I had to restart one broker because there was no diskspace left. After 
recreating all indexes etc. the broker recognized the old data and compacted it 
correctly. I had to restart all other brokers of the cluster, too, for them to 
also recognize the old data and start compacting.

So I guess, before restarting the brokers where to busy to compact/delete old 
data? Is there a configuration to ensure compaction after a certain amount of 
time or something?

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Spico Florin  
Gesendet: Montag, 17. Dezember 2018 14:28
An: users@kafka.apache.org
Betreff: Re: Configuration of log compaction

Hello!
  Please check whether the segment.ms configuration on topic will help you to 
solve your problem.

https://kafka.apache.org/documentation/

https://stackoverflow.com/questions/41048041/kafka-deletes-segments-even-before-segment-size-is-reached

Regards,
 Florin

segment.ms This configuration controls the period of time after which Kafka 
will force the log to roll even if the segment file isn't full to ensure that 
retention can delete or compact old data. long 60480 [1,...] log.roll.ms 
medium

On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
wrote:

> Dear kafka users,
>
> I've got a problem on one of my kafka clusters. I use this cluster 
> with kafka streams applications. Some of this stream apps use a kafka 
> state store. Therefore a changelog topic is created for those stores 
> with cleanup policy "compact". One of these topics is running wild for 
> some time now and seems to grow indefinitely. When I check the  log 
> file of the first segment, there is a lot of data in it, that should 
> have been compacted already.
>
> So I guess I did not configure everything correctly for log compaction 
> to work as expected. What config parameters do have influence on log 
> compaction? And how to set them, when I want data older than 4 hours 
> to be compacted?
>
> Thanks in advance.
>
> Best,
> Claudia
>


Re: Configuration of log compaction

2018-12-17 Thread Spico Florin
Hello!
  Please check whether the segment.ms configuration on topic will help you
to solve your problem.

https://kafka.apache.org/documentation/

https://stackoverflow.com/questions/41048041/kafka-deletes-segments-even-before-segment-size-is-reached

Regards,
 Florin

segment.ms This configuration controls the period of time after which Kafka
will force the log to roll even if the segment file isn't full to ensure
that retention can delete or compact old data. long 60480 [1,...]
log.roll.ms medium

On Mon, Dec 17, 2018 at 12:28 PM Claudia Wegmann 
wrote:

> Dear kafka users,
>
> I've got a problem on one of my kafka clusters. I use this cluster with
> kafka streams applications. Some of this stream apps use a kafka state
> store. Therefore a changelog topic is created for those stores with cleanup
> policy "compact". One of these topics is running wild for some time now and
> seems to grow indefinitely. When I check the  log file of the first
> segment, there is a lot of data in it, that should have been compacted
> already.
>
> So I guess I did not configure everything correctly for log compaction to
> work as expected. What config parameters do have influence on log
> compaction? And how to set them, when I want data older than 4 hours to be
> compacted?
>
> Thanks in advance.
>
> Best,
> Claudia
>


Configuration of log compaction

2018-12-17 Thread Claudia Wegmann
Dear kafka users,

I've got a problem on one of my kafka clusters. I use this cluster with kafka 
streams applications. Some of this stream apps use a kafka state store. 
Therefore a changelog topic is created for those stores with cleanup policy 
"compact". One of these topics is running wild for some time now and seems to 
grow indefinitely. When I check the  log file of the first segment, there is a 
lot of data in it, that should have been compacted already.

So I guess I did not configure everything correctly for log compaction to work 
as expected. What config parameters do have influence on log compaction? And 
how to set them, when I want data older than 4 hours to be compacted?

Thanks in advance.

Best,
Claudia


Re: manually trigger log compaction

2018-10-03 Thread Brett Rann
There's no way to "trigger compaction" per se, but you can use multiple
configs to
compel it. I've documented it in this ticket:
https://issues.apache.org/jira/browse/KAFKA-7137

Essentially you can set min.cleanable.dirty.ratio=0 and segment.ms to make
sure  a new segment is rolled (compaction won't happen on the active
segment and if you care about that segment.ms lets you compel a new one)

And then it needs a message to come in to trigger the segment roll and
the compaction evaulation.



On Thu, Sep 27, 2018 at 2:00 AM M. Manna  wrote:

> This is possible using kafka-configs script. You need 'topics' as entity
> type and --alter directive. The changes are made cluster-wide.
>
> Try the help documentation for kafka-configs.
>
> Regards,
>
> On Wed, 26 Sep 2018 at 16:16, Xu, Nan  wrote:
>
> > Hi,
> >
> > Wondering is there a way to manually trigger a log compaction for a
> > certain topic?
> >
> > Thanks,
> > Nan
> >
> > --
> > This message, and any attachments, is for the intended recipient(s) only,
> > may contain information that is privileged, confidential and/or
> proprietary
> > and subject to important terms and conditions available at
> > http://www.bankofamerica.com/emaildisclaimer
> <http://www.bankofamerica.com/emaildisclaimer>. If you are
> not the
> > intended recipient, please delete this message.
> >
>




-- 

Brett Rann

Senior DevOps Engineer


Zendesk International Ltd

395 Collins Street, Melbourne VIC 3000 Australia

Mobile: +61 (0) 418 826 017


Re: manually trigger log compaction

2018-09-26 Thread M. Manna
This is possible using kafka-configs script. You need 'topics' as entity
type and --alter directive. The changes are made cluster-wide.

Try the help documentation for kafka-configs.

Regards,

On Wed, 26 Sep 2018 at 16:16, Xu, Nan  wrote:

> Hi,
>
>Wondering is there a way to manually trigger a log compaction for a
> certain topic?
>
> Thanks,
> Nan
>
> --
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>


manually trigger log compaction

2018-09-26 Thread Xu, Nan
Hi, 

   Wondering is there a way to manually trigger a log compaction for a certain 
topic?

Thanks,
Nan

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: How set log compaction policies at cluster level

2018-06-14 Thread Manikumar
Those configs are topic-level config names. To configure  in
server.properties,
we need to use server config property names (log.cleanup.policy,
log.cleaner.delete.retention.ms etc..).

check the "SERVER DEFAULT PROPERTY"  column in the table given in the
below link
http://kafka.apache.org/documentation/#topicconfigs



On Thu, Jun 14, 2018 at 3:51 PM David Espinosa  wrote:

> Hi all,
>
> I would like to apply log compaction configuration for any topic in my
> kafka cluster, as default properties. These configuration properties are:
>
>- cleanup.policy
>- delete.retention.ms
>- segment.ms
>- min.cleanable.dirty.ratio
>
> I have tried to place them in the server.properties file, but they are not
> applied. I could only apply them when using kafka-topics create topic
> command.
>
> Somebody knows how to apply those properties as default for any topic
> created?
>
> Thanks in advance,
> David.
>


How set log compaction policies at cluster level

2018-06-14 Thread David Espinosa
Hi all,

I would like to apply log compaction configuration for any topic in my
kafka cluster, as default properties. These configuration properties are:

   - cleanup.policy
   - delete.retention.ms
   - segment.ms
   - min.cleanable.dirty.ratio

I have tried to place them in the server.properties file, but they are not
applied. I could only apply them when using kafka-topics create topic
command.

Somebody knows how to apply those properties as default for any topic
created?

Thanks in advance,
David.


Log Compaction configuration over all topics in cluster

2018-05-09 Thread David Espinosa
Hi all,
I would like to apply log compaction configuration for any topic in my
kafka cluster, as default properties. These configuration properties are:

   - cleanup.policy
   - delete.retention.ms
   - segment.ms
   - min.cleanable.dirty.ratio

I have tried to place them in the server.properties file, but they are not
applied. I could only apply them when using kafka-topics create topic
command.

Somebody knows how to apply those properties as default for any topic
created?

Thanks in advance,
David.


Kafka Log Compaction (LogCleaner) and retry

2018-04-23 Thread Rabin Banerjee
Hi All,

I have a doubt regarding what will happen if there is a retry to a message
and log compaction is enabled.

retries 10
retry.backoff.ms 1000

For example the data looks like this
1-->State1 (sent)
1-->State2 (failed, waiting for retry)
1-->State3 (sent)
1-->State2 (State2 is now sent as retry)

So will log compaction treat state 3 as latest or state2 as latest ?

Thanks ,
RB


RE: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Kopacki, Tomasz (Nokia - PL/Wroclaw)
I think first solution is what I need. Your second proposal also looks fine but 
I don't like idea of keeping additional ledger.
Thanks Svante ! I appreciate your help.

Sincerely, 
Tomasz Kopacki
DevOps Engineer @ Nokia

-Original Message-
From: Svante Karlsson [mailto:svante.karls...@csi.se] 
Sent: Wednesday, March 21, 2018 11:21 AM
To: users@kafka.apache.org
Subject: Re: log compaction v log rotation - best of the two worlds

alt1)
if you can store a generation counter in the value of the "latest value"
topic you could do as follows

topic latest_value key [id]

topic full_history key[id, generation]

on delete get the latest_value.generation_counter and issue deletes on 
full_history key[id, 0..generation_counter]

alt2)
if you cannot store a generation_counter in "latest_value" store a timestamp or 
uuid to make each key unique

topic latest_value key [id]

topic full_history key[id, timestamp/uuid] on delete of "id" scan full_history 
topic from beginning and issue deletes on full_history key[id, timestamp]

you could optimized this by having another topic that contains a "to be purged 
ids"

/svante



2018-03-21 11:16 GMT+01:00 Manikumar :

> Sorry, I was wrong. For history topic, we can use regular topic with 
> sufficient retention period.
> maybe others can give more ideas.
>
> On Wed, Mar 21, 2018 at 3:34 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) 
> < tomasz.kopa...@nokia.com> wrote:
>
> > Do you mean I can use tombstone if my clean policy is 'delete' and 
> > it still work ?
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> > -Original Message-
> > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > Sent: Wednesday, March 21, 2018 11:03 AM
> > To: users@kafka.apache.org
> > Subject: Re: log compaction v log rotation - best of the two worlds
> >
> > Not sure if understood requirement correctly.  one option is to use 
> > two compacted topic topics. one is for current state of the resource 
> > and one
> is
> > for history. and use tombstones whenever you want to clear them.
> >
> > On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - 
> > PL/Wroclaw) < tomasz.kopa...@nokia.com> wrote:
> >
> > > Almost,
> > > Consider this example:
> > >
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream 
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| where RX
> > > represents updates of a particular resource. I need to keep the 
> > > history of changes forever for all the resources but only until 
> > > resource is alive. If resource expires/dies I'd like to remove it 
> > > completely. In this example consider that resource R2 dies but 
> > > others are still alive. In such case I'd like to able to transform this 
> > > into:
> > > |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the 
> > > |R1|R1|R3|R4|R4|R1| history
> > > of changes but neither I can simply remove 'old' messages because 
> > > I need to do this based of the lifecycle of the resource not just 
> > > their
> > age.
> > >
> > >
> > >
> > > Sincerely,
> > > Tomasz Kopacki
> > > DevOps Engineer @ Nokia
> > >
> > > -Original Message-
> > > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > > Sent: Wednesday, March 21, 2018 10:17 AM
> > > To: users@kafka.apache.org
> > > Subject: Re: log compaction v log rotation - best of the two 
> > > worlds
> > >
> > > We can enable both compaction and retention for a topic by setting 
> > > cleanup.policy="delete,compact"
> > > http://kafka.apache.org/documentation/#topicconfigs
> > >
> > > Does this handle your requirement?
> > >
> > > On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - 
> > > PL/Wroclaw) < tomasz.kopa...@nokia.com> wrote:
> > >
> > > > Hi,
> > > > I've been recently exploring log handling in kafka and I wonder 
> > > > if/how can I mixed log compaction with log rotation.
> > > > A little background first:
> > > > I have an application that uses kafka topics as a backend for 
> > > > event sourcing. Messages represents change of state of my 'resources'.
> > > > Each resource has UID that is also used as a key for the messages.
> > > > My resources have a lifecycle and when their life ends I don't 
> > > > need them anymore and there is no point in keeping their 
> > > > history. Having said that I thought that best choice for me will 
> > > > be log compaction with tombstone feature but I also would like 
> > > > to have a possibility to keep history of changes for the resources(only 
> > > > until they die).
> > > > With those requirements I'd love to have a possibility to use 
> > > > tombstone feature for log rotation but I guess it ain't working 
> > > > like
> > > that.
> > > >
> > > > Does anyone here had similar requirements and solve that somehow ?
> > > >
> > > >
> > > > Sincerely,
> > > > Tomasz Kopacki
> > > > DevOps Engineer @ Nokia
> > > >
> > > >
> > >
> >
>


Re: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Svante Karlsson
alt1)
if you can store a generation counter in the value of the "latest value"
topic you could do as follows

topic latest_value key [id]

topic full_history key[id, generation]

on delete get the latest_value.generation_counter and issue deletes on
full_history
key[id, 0..generation_counter]

alt2)
if you cannot store a generation_counter in "latest_value" store a
timestamp or uuid to make each key unique

topic latest_value key [id]

topic full_history key[id, timestamp/uuid]
on delete of "id" scan full_history topic from beginning and issue deletes
on full_history key[id, timestamp]

you could optimized this by having another topic that contains a "to be
purged ids"

/svante



2018-03-21 11:16 GMT+01:00 Manikumar :

> Sorry, I was wrong. For history topic, we can use regular topic with
> sufficient retention period.
> maybe others can give more ideas.
>
> On Wed, Mar 21, 2018 at 3:34 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> tomasz.kopa...@nokia.com> wrote:
>
> > Do you mean I can use tombstone if my clean policy is 'delete' and it
> > still work ?
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> > -Original Message-
> > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > Sent: Wednesday, March 21, 2018 11:03 AM
> > To: users@kafka.apache.org
> > Subject: Re: log compaction v log rotation - best of the two worlds
> >
> > Not sure if understood requirement correctly.  one option is to use two
> > compacted topic topics. one is for current state of the resource and one
> is
> > for history. and use tombstones whenever you want to clear them.
> >
> > On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> > tomasz.kopa...@nokia.com> wrote:
> >
> > > Almost,
> > > Consider this example:
> > >
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| RX
> > > represents updates of a particular resource. I need to keep the
> > > history of changes forever for all the resources but only until
> > > resource is alive. If resource expires/dies I'd like to remove it
> > > completely. In this example consider that resource R2 dies but others
> > > are still alive. In such case I'd like to able to transform this into:
> > > |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the
> > > |R1|R1|R3|R4|R4|R1| history
> > > of changes but neither I can simply remove 'old' messages because I
> > > need to do this based of the lifecycle of the resource not just their
> > age.
> > >
> > >
> > >
> > > Sincerely,
> > > Tomasz Kopacki
> > > DevOps Engineer @ Nokia
> > >
> > > -Original Message-
> > > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > > Sent: Wednesday, March 21, 2018 10:17 AM
> > > To: users@kafka.apache.org
> > > Subject: Re: log compaction v log rotation - best of the two worlds
> > >
> > > We can enable both compaction and retention for a topic by setting
> > > cleanup.policy="delete,compact"
> > > http://kafka.apache.org/documentation/#topicconfigs
> > >
> > > Does this handle your requirement?
> > >
> > > On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw)
> > > < tomasz.kopa...@nokia.com> wrote:
> > >
> > > > Hi,
> > > > I've been recently exploring log handling in kafka and I wonder
> > > > if/how can I mixed log compaction with log rotation.
> > > > A little background first:
> > > > I have an application that uses kafka topics as a backend for event
> > > > sourcing. Messages represents change of state of my 'resources'.
> > > > Each resource has UID that is also used as a key for the messages.
> > > > My resources have a lifecycle and when their life ends I don't need
> > > > them anymore and there is no point in keeping their history. Having
> > > > said that I thought that best choice for me will be log compaction
> > > > with tombstone feature but I also would like to have a possibility
> > > > to keep history of changes for the resources(only until they die).
> > > > With those requirements I'd love to have a possibility to use
> > > > tombstone feature for log rotation but I guess it ain't working like
> > > that.
> > > >
> > > > Does anyone here had similar requirements and solve that somehow ?
> > > >
> > > >
> > > > Sincerely,
> > > > Tomasz Kopacki
> > > > DevOps Engineer @ Nokia
> > > >
> > > >
> > >
> >
>


Re: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Manikumar
Sorry, I was wrong. For history topic, we can use regular topic with
sufficient retention period.
maybe others can give more ideas.

On Wed, Mar 21, 2018 at 3:34 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
tomasz.kopa...@nokia.com> wrote:

> Do you mean I can use tombstone if my clean policy is 'delete' and it
> still work ?
>
> Sincerely,
> Tomasz Kopacki
> DevOps Engineer @ Nokia
>
> -Original Message-
> From: Manikumar [mailto:manikumar.re...@gmail.com]
> Sent: Wednesday, March 21, 2018 11:03 AM
> To: users@kafka.apache.org
> Subject: Re: log compaction v log rotation - best of the two worlds
>
> Not sure if understood requirement correctly.  one option is to use two
> compacted topic topics. one is for current state of the resource and one is
> for history. and use tombstones whenever you want to clear them.
>
> On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> tomasz.kopa...@nokia.com> wrote:
>
> > Almost,
> > Consider this example:
> >
> > |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where
> > |R1|R2|R1|R3|R2|R4|R4|R1|R2| RX
> > represents updates of a particular resource. I need to keep the
> > history of changes forever for all the resources but only until
> > resource is alive. If resource expires/dies I'd like to remove it
> > completely. In this example consider that resource R2 dies but others
> > are still alive. In such case I'd like to able to transform this into:
> > |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the
> > |R1|R1|R3|R4|R4|R1| history
> > of changes but neither I can simply remove 'old' messages because I
> > need to do this based of the lifecycle of the resource not just their
> age.
> >
> >
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> > -Original Message-
> > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > Sent: Wednesday, March 21, 2018 10:17 AM
> > To: users@kafka.apache.org
> > Subject: Re: log compaction v log rotation - best of the two worlds
> >
> > We can enable both compaction and retention for a topic by setting
> > cleanup.policy="delete,compact"
> > http://kafka.apache.org/documentation/#topicconfigs
> >
> > Does this handle your requirement?
> >
> > On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw)
> > < tomasz.kopa...@nokia.com> wrote:
> >
> > > Hi,
> > > I've been recently exploring log handling in kafka and I wonder
> > > if/how can I mixed log compaction with log rotation.
> > > A little background first:
> > > I have an application that uses kafka topics as a backend for event
> > > sourcing. Messages represents change of state of my 'resources'.
> > > Each resource has UID that is also used as a key for the messages.
> > > My resources have a lifecycle and when their life ends I don't need
> > > them anymore and there is no point in keeping their history. Having
> > > said that I thought that best choice for me will be log compaction
> > > with tombstone feature but I also would like to have a possibility
> > > to keep history of changes for the resources(only until they die).
> > > With those requirements I'd love to have a possibility to use
> > > tombstone feature for log rotation but I guess it ain't working like
> > that.
> > >
> > > Does anyone here had similar requirements and solve that somehow ?
> > >
> > >
> > > Sincerely,
> > > Tomasz Kopacki
> > > DevOps Engineer @ Nokia
> > >
> > >
> >
>


RE: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Kopacki, Tomasz (Nokia - PL/Wroclaw)
Do you mean I can use tombstone if my clean policy is 'delete' and it still 
work ?

Sincerely, 
Tomasz Kopacki
DevOps Engineer @ Nokia

-Original Message-
From: Manikumar [mailto:manikumar.re...@gmail.com] 
Sent: Wednesday, March 21, 2018 11:03 AM
To: users@kafka.apache.org
Subject: Re: log compaction v log rotation - best of the two worlds

Not sure if understood requirement correctly.  one option is to use two 
compacted topic topics. one is for current state of the resource and one is for 
history. and use tombstones whenever you want to clear them.

On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) < 
tomasz.kopa...@nokia.com> wrote:

> Almost,
> Consider this example:
>
> |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where 
> |R1|R2|R1|R3|R2|R4|R4|R1|R2| RX
> represents updates of a particular resource. I need to keep the 
> history of changes forever for all the resources but only until 
> resource is alive. If resource expires/dies I'd like to remove it 
> completely. In this example consider that resource R2 dies but others 
> are still alive. In such case I'd like to able to transform this into:
> |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the 
> |R1|R1|R3|R4|R4|R1| history
> of changes but neither I can simply remove 'old' messages because I 
> need to do this based of the lifecycle of the resource not just their age.
>
>
>
> Sincerely,
> Tomasz Kopacki
> DevOps Engineer @ Nokia
>
> -Original Message-
> From: Manikumar [mailto:manikumar.re...@gmail.com]
> Sent: Wednesday, March 21, 2018 10:17 AM
> To: users@kafka.apache.org
> Subject: Re: log compaction v log rotation - best of the two worlds
>
> We can enable both compaction and retention for a topic by setting 
> cleanup.policy="delete,compact"
> http://kafka.apache.org/documentation/#topicconfigs
>
> Does this handle your requirement?
>
> On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) 
> < tomasz.kopa...@nokia.com> wrote:
>
> > Hi,
> > I've been recently exploring log handling in kafka and I wonder 
> > if/how can I mixed log compaction with log rotation.
> > A little background first:
> > I have an application that uses kafka topics as a backend for event 
> > sourcing. Messages represents change of state of my 'resources'. 
> > Each resource has UID that is also used as a key for the messages.
> > My resources have a lifecycle and when their life ends I don't need 
> > them anymore and there is no point in keeping their history. Having 
> > said that I thought that best choice for me will be log compaction 
> > with tombstone feature but I also would like to have a possibility 
> > to keep history of changes for the resources(only until they die).
> > With those requirements I'd love to have a possibility to use 
> > tombstone feature for log rotation but I guess it ain't working like
> that.
> >
> > Does anyone here had similar requirements and solve that somehow ?
> >
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> >
>


Re: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Manikumar
Not sure if understood requirement correctly.  one option is to use two
compacted topic topics. one is for current state of the resource
and one is for history. and use tombstones whenever you want to clear them.

On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
tomasz.kopa...@nokia.com> wrote:

> Almost,
> Consider this example:
>
> |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where RX
> represents updates of a particular resource. I need to keep the history of
> changes forever for all the resources but only until resource is alive. If
> resource expires/dies I'd like to remove it completely. In this example
> consider that resource R2 dies but others are still alive. In such case I'd
> like to able to transform this into:
> |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the history
> of changes but neither I can simply remove 'old' messages because I need to
> do this based of the lifecycle of the resource not just their age.
>
>
>
> Sincerely,
> Tomasz Kopacki
> DevOps Engineer @ Nokia
>
> -Original Message-
> From: Manikumar [mailto:manikumar.re...@gmail.com]
> Sent: Wednesday, March 21, 2018 10:17 AM
> To: users@kafka.apache.org
> Subject: Re: log compaction v log rotation - best of the two worlds
>
> We can enable both compaction and retention for a topic by setting
> cleanup.policy="delete,compact"
> http://kafka.apache.org/documentation/#topicconfigs
>
> Does this handle your requirement?
>
> On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> tomasz.kopa...@nokia.com> wrote:
>
> > Hi,
> > I've been recently exploring log handling in kafka and I wonder if/how
> > can I mixed log compaction with log rotation.
> > A little background first:
> > I have an application that uses kafka topics as a backend for event
> > sourcing. Messages represents change of state of my 'resources'. Each
> > resource has UID that is also used as a key for the messages.
> > My resources have a lifecycle and when their life ends I don't need
> > them anymore and there is no point in keeping their history. Having
> > said that I thought that best choice for me will be log compaction
> > with tombstone feature but I also would like to have a possibility to
> > keep history of changes for the resources(only until they die).
> > With those requirements I'd love to have a possibility to use
> > tombstone feature for log rotation but I guess it ain't working like
> that.
> >
> > Does anyone here had similar requirements and solve that somehow ?
> >
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> >
>


RE: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Kopacki, Tomasz (Nokia - PL/Wroclaw)
Almost,
Consider this example:

|R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where RX 
represents updates of a particular resource. I need to keep the history of 
changes forever for all the resources but only until resource is alive. If 
resource expires/dies I'd like to remove it completely. In this example 
consider that resource R2 dies but others are still alive. In such case I'd 
like to able to transform this into:
|R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the history of 
changes but neither I can simply remove 'old' messages because I need to do 
this based of the lifecycle of the resource not just their age.



Sincerely, 
Tomasz Kopacki
DevOps Engineer @ Nokia

-Original Message-
From: Manikumar [mailto:manikumar.re...@gmail.com] 
Sent: Wednesday, March 21, 2018 10:17 AM
To: users@kafka.apache.org
Subject: Re: log compaction v log rotation - best of the two worlds

We can enable both compaction and retention for a topic by setting 
cleanup.policy="delete,compact"
http://kafka.apache.org/documentation/#topicconfigs

Does this handle your requirement?

On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) < 
tomasz.kopa...@nokia.com> wrote:

> Hi,
> I've been recently exploring log handling in kafka and I wonder if/how 
> can I mixed log compaction with log rotation.
> A little background first:
> I have an application that uses kafka topics as a backend for event 
> sourcing. Messages represents change of state of my 'resources'. Each 
> resource has UID that is also used as a key for the messages.
> My resources have a lifecycle and when their life ends I don't need 
> them anymore and there is no point in keeping their history. Having 
> said that I thought that best choice for me will be log compaction 
> with tombstone feature but I also would like to have a possibility to 
> keep history of changes for the resources(only until they die).
> With those requirements I'd love to have a possibility to use 
> tombstone feature for log rotation but I guess it ain't working like that.
>
> Does anyone here had similar requirements and solve that somehow ?
>
>
> Sincerely,
> Tomasz Kopacki
> DevOps Engineer @ Nokia
>
>


Re: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Manikumar
We can enable both compaction and retention for a topic by
setting cleanup.policy="delete,compact"
http://kafka.apache.org/documentation/#topicconfigs

Does this handle your requirement?

On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
tomasz.kopa...@nokia.com> wrote:

> Hi,
> I've been recently exploring log handling in kafka and I wonder if/how can
> I mixed log compaction with log rotation.
> A little background first:
> I have an application that uses kafka topics as a backend for event
> sourcing. Messages represents change of state of my 'resources'. Each
> resource has UID that is also used as a key for the messages.
> My resources have a lifecycle and when their life ends I don't need them
> anymore and there is no point in keeping their history. Having said that I
> thought that best choice for me will be log compaction with tombstone
> feature but I also would like to have a possibility to keep history of
> changes for the resources(only until they die).
> With those requirements I'd love to have a possibility to use tombstone
> feature for log rotation but I guess it ain't working like that.
>
> Does anyone here had similar requirements and solve that somehow ?
>
>
> Sincerely,
> Tomasz Kopacki
> DevOps Engineer @ Nokia
>
>


log compaction v log rotation - best of the two worlds

2018-03-21 Thread Kopacki, Tomasz (Nokia - PL/Wroclaw)
Hi,
I've been recently exploring log handling in kafka and I wonder if/how can I 
mixed log compaction with log rotation.
A little background first:
I have an application that uses kafka topics as a backend for event sourcing. 
Messages represents change of state of my 'resources'. Each resource has UID 
that is also used as a key for the messages.
My resources have a lifecycle and when their life ends I don't need them 
anymore and there is no point in keeping their history. Having said that I 
thought that best choice for me will be log compaction with tombstone feature 
but I also would like to have a possibility to keep history of changes for the 
resources(only until they die).
With those requirements I'd love to have a possibility to use tombstone feature 
for log rotation but I guess it ain't working like that.

Does anyone here had similar requirements and solve that somehow ?


Sincerely,
Tomasz Kopacki
DevOps Engineer @ Nokia



Re: Log Compaction Not Picking up Topic [solved]

2017-10-25 Thread Elmar Weber

Hello Xin, hello Jan,

worked perfectly. I did a build of an image based on 0.11.0.1 and 
applied the missing patch, cleaning went through and resulted in the 
expected size.


Thanks a lot for the quick help,
Elmar


On 10/25/2017 1:03 PM, Xin Li wrote:

Hey Elmar,
The only thing you need to do is upgrade,
Kafka track cleaned offset using cleaner-offset-checkpoint file.

Best,
Xin

Xin Li Data EngineeringXin.Li@ trivago.com 
www.trivago.com F +49 (0) 211 540 
65 115We're hiring! Check out our vacancies http://company.trivago.com/jobs/Court of 
registration: Amtsgericht Düsseldorf, HRB 51842
Managing directors: Rolf Schrömgens · Malte Siewert · Peter Vinnemeier · Andrej 
Lehnert · Johannes Thomas
trivago GmbH · Bennigsen-Platz 1 · D – 40474 Düsseldorf
* This email message may contain legally privileged and/or confidential 
information.
You are hereby notified that any disclosure, copying, distribution, or use of 
this email message is strictly prohibited.

On 25.10.17, 12:34, "Elmar Weber"  wrote:

 Hi,
 
 thanks, I'll give it a try, we run on Kubernetes so it's not a big issue

 to replicate the whole env including data.
 
 One question I'd have left:

 - How can I force a re-compaction over the whole topic? Because I guess
the Log Cleaner market everything so far as not able to clean, how
will it recheck the whole log?
 
 Best,

 Elmar
 
 
 
 
 On 10/25/2017 12:29 PM, Jan Filipiak wrote:

 > Hi,
 >
 > unfortunatly there is nothing trivial you could do here.
 > Without upgrading your kafkas you can only bounce the partition back and
 > forth
 > between brokers so they compact while its still small.
 >
 > With upgrading you could also just cherrypick this very commit or put a
 > logstatement to verify.
 >
 > Given the Logsizes your dealing with, I am very confident that this is
 > your issue.
 >
 > Best Jan
 >
 >
 > On 25.10.2017 12:21, Elmar Weber wrote:
 >> Hi,
 >>
 >> On 10/25/2017 12:15 PM, Xin Li wrote:
 >> > I think that is a bug, and  should be fixed in this task
 >> https://issues.apache.org/jira/browse/KAFKA-6030.
 >> > We experience that in our kafka cluster, we just check out the
 >> 11.0.2 version, build it ourselves.
 >>
 >> thanks for the hint, as it looks like a calculation issue, would it be
 >> possible to verify this by manually changing the clean ratio or some
 >> other settings?
 >>
 >> Best,
 >> Elmar
 >>
 >
 >
 
 





Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Xin Li
Hey Elmar,
The only thing you need to do is upgrade, 
Kafka track cleaned offset using cleaner-offset-checkpoint file.

Best,
Xin

Xin Li Data EngineeringXin.Li@ trivago.com 
www.trivago.com F +49 (0) 211 
540 65 115We're hiring! Check out our vacancies 
http://company.trivago.com/jobs/Court of registration: Amtsgericht Düsseldorf, 
HRB 51842
Managing directors: Rolf Schrömgens · Malte Siewert · Peter Vinnemeier · Andrej 
Lehnert · Johannes Thomas
trivago GmbH · Bennigsen-Platz 1 · D – 40474 Düsseldorf
* This email message may contain legally privileged and/or confidential 
information.
You are hereby notified that any disclosure, copying, distribution, or use of 
this email message is strictly prohibited.

On 25.10.17, 12:34, "Elmar Weber"  wrote:

Hi,

thanks, I'll give it a try, we run on Kubernetes so it's not a big issue 
to replicate the whole env including data.

One question I'd have left:
- How can I force a re-compaction over the whole topic? Because I guess
   the Log Cleaner market everything so far as not able to clean, how
   will it recheck the whole log?

Best,
Elmar




On 10/25/2017 12:29 PM, Jan Filipiak wrote:
> Hi,
> 
> unfortunatly there is nothing trivial you could do here.
> Without upgrading your kafkas you can only bounce the partition back and 
> forth
> between brokers so they compact while its still small.
> 
> With upgrading you could also just cherrypick this very commit or put a 
> logstatement to verify.
> 
> Given the Logsizes your dealing with, I am very confident that this is 
> your issue.
> 
> Best Jan
> 
> 
> On 25.10.2017 12:21, Elmar Weber wrote:
>> Hi,
>>
>> On 10/25/2017 12:15 PM, Xin Li wrote:
>> > I think that is a bug, and  should be fixed in this task 
>> https://issues.apache.org/jira/browse/KAFKA-6030.
>> > We experience that in our kafka cluster, we just check out the 
>> 11.0.2 version, build it ourselves.
>>
>> thanks for the hint, as it looks like a calculation issue, would it be 
>> possible to verify this by manually changing the clean ratio or some 
>> other settings?
>>
>> Best,
>> Elmar
>>
> 
> 





Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Xin Li
Hey, 
Because of the overflow the calculation for dirty ratios is minus, and I guess 
upgrade is the one time for good fix.

And we running that for quite a while, so far so good.

Best,
Xin

Xin Li Data EngineeringXin.Li@ trivago.com 
www.trivago.com F +49 (0) 211 
540 65 115We're hiring! Check out our vacancies 
http://company.trivago.com/jobs/Court of registration: Amtsgericht Düsseldorf, 
HRB 51842
Managing directors: Rolf Schrömgens · Malte Siewert · Peter Vinnemeier · Andrej 
Lehnert · Johannes Thomas
trivago GmbH · Bennigsen-Platz 1 · D – 40474 Düsseldorf
* This email message may contain legally privileged and/or confidential 
information.
You are hereby notified that any disclosure, copying, distribution, or use of 
this email message is strictly prohibited.

On 25.10.17, 12:21, "Elmar Weber"  wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
 > I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
 > We experience that in our kafka cluster, we just check out the 11.0.2 
version, build it ourselves.

thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?

Best,
Elmar





Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Elmar Weber

Hi,

thanks, I'll give it a try, we run on Kubernetes so it's not a big issue 
to replicate the whole env including data.


One question I'd have left:
- How can I force a re-compaction over the whole topic? Because I guess
  the Log Cleaner market everything so far as not able to clean, how
  will it recheck the whole log?

Best,
Elmar




On 10/25/2017 12:29 PM, Jan Filipiak wrote:

Hi,

unfortunatly there is nothing trivial you could do here.
Without upgrading your kafkas you can only bounce the partition back and 
forth

between brokers so they compact while its still small.

With upgrading you could also just cherrypick this very commit or put a 
logstatement to verify.


Given the Logsizes your dealing with, I am very confident that this is 
your issue.


Best Jan


On 25.10.2017 12:21, Elmar Weber wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 
11.0.2 version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar








Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Jan Filipiak

Hi,

unfortunatly there is nothing trivial you could do here.
Without upgrading your kafkas you can only bounce the partition back and 
forth

between brokers so they compact while its still small.

With upgrading you could also just cherrypick this very commit or put a 
logstatement to verify.


Given the Logsizes your dealing with, I am very confident that this is 
your issue.


Best Jan


On 25.10.2017 12:21, Elmar Weber wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 
11.0.2 version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar





Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Elmar Weber

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 11.0.2 
version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar



Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Xin Li
Hey,
I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
We experience that in our kafka cluster, we just check out the 11.0.2 version, 
build it ourselves.

Best,
Xin

Xin Li Data EngineeringXin.Li@ trivago.com 
www.trivago.com F +49 (0) 211 
540 65 115We're hiring! Check out our vacancies 
http://company.trivago.com/jobs/Court of registration: Amtsgericht Düsseldorf, 
HRB 51842
Managing directors: Rolf Schrömgens · Malte Siewert · Peter Vinnemeier · Andrej 
Lehnert · Johannes Thomas
trivago GmbH · Bennigsen-Platz 1 · D – 40474 Düsseldorf
* This email message may contain legally privileged and/or confidential 
information.
You are hereby notified that any disclosure, copying, distribution, or use of 
this email message is strictly prohibited.

On 25.10.17, 12:04, "Manikumar"  wrote:

any errors in log cleaner logs?

On Wed, Oct 25, 2017 at 3:12 PM, Elmar Weber  wrote:

> Hello,
>
> I'm having trouble getting Kafka to compact a topic. It's over 300GB and
> has enough segments to warrant cleaning. It should only be about 40 GB
> (there is a copy in a db that is unique on the key). Below are the configs
> we have (default broker) and topic override.
>
>
> Is there something I'm missing on which setting is overriding which one or
> something still wrongly?
>
> retention.ms and delete.retentions.ms I set manually after creation on
> the topic and some segments have been created already.
>
> Kafka version 0.11
>
> Server Defaults for new segments of the topic:
>
> The settings used when a new log was created for the topic:
>
> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
> file.delete.delay.ms -> 6, max.message.bytes -> 2097152,
> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
> unclean.leader.election.enable -> false, retention.bytes -> -1,
> delete.retention.ms -> 8640, cleanup.policy -> compact, flush.ms ->
> 9223372036854775807, segment.ms -> 60480, segment.bytes ->
> 1073741824, retention.ms -> -1, message.timestamp.difference.max.ms ->
> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
> 9223372036854775807}
>
> Topic Overrides (overridden after creation).
>
> {retention.ms=360, delete.retention.ms=360,
> max.message.bytes=10485760, cleanup.policy=compact}
>
>
>
> The full server startup config:
>
> advertised.host.name = null
> advertised.listeners = null
> advertised.port = null
> alter.config.policy.class.name = null
> authorizer.class.name =
> auto.create.topics.enable = false
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 1
> broker.id.generation.enable = true
> broker.rack = europe-west1-c
> compression.type = producer
> connections.max.idle.ms = 60
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 1
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 0
> group.max.session.timeout.ms = 30
> group.min.session.timeout.ms = 6000
> host.name =
> inter.broker.listener.name = null
> inter.broker.protocol.version = 0.11.0-IV2
> leader.imbalance.check.interval.seconds = 300
> leader.imbalance.per.broker.percentage = 10
> listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PL
> AINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT
> listeners = null
> log.cleaner.backoff.ms = 15000
> log.cleaner.dedupe.buffer.size = 134217728
> log.cleaner.delete.retention.ms = 8640
> log.cleaner.enable = true
> log.cleaner.io.buffer.load.factor = 0.9
> log.cleaner.io.buffer.size = 524288
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> log.cleaner.min.cleanable.ratio = 0.5
> log.cleaner.min.compaction.lag.ms = 0
> log.cleaner.threads = 1
> log.cleanup.policy = [delete]
> log.dir = /tmp/kafka-logs
> log.dirs = /var/lib/kafka/data/topics
> log.flush.interval.messages = 9223372036854775807
> log.flush.interval.ms = null
> log.flush.offset.checkpoint.interval.ms = 6
> log.flush.scheduler.interval.ms = 9223372036854775807
> log.flush.start.offset.checkpoint.interval.ms = 6
> log.index.interv

Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Elmar Weber

On 10/25/2017 12:03 PM, Manikumar wrote:

any errors in log cleaner logs?


Not as far as I can see

root@kafka-1:/opt/kafka/logs# cat log-cleaner.log* | grep -i error
(empty)

However, I've seen that it actually did cleaning of the whole topic 
(excerpts below), but it didn't seem to find anything worth cleaning 
(size stayed the same).


Are there any global settings that could affect this?
I'm running a default config from Kafka, the only things changed are 
message size, topic creation/deletion and the defaults for retention. 
Which are all overwritten for this topic (see original post)


I'm writing a simple script to read and confirm the key distributions to 
debug further, but as the same data (without duplicates) is also written 
to a DB I'm pretty sure that the size is too big for not having gotten 
compacted.



root@kafka-1:/opt/kafka/logs# cat log-cleaner.log* | grep 
events.lg.aggregated
[2017-10-24 11:08:43,462] INFO Cleaner 0: Beginning cleaning of log 
events.lg.aggregated-0. (kafka.log.LogCleaner)
[2017-10-24 11:08:43,462] INFO Cleaner 0: Building offset map for 
events.lg.aggregated-0... (kafka.log.LogCleaner)
[2017-10-24 11:08:43,688] INFO Cleaner 0: Building offset map for log 
events.lg.aggregated-0 for 1 segments in offset range [0, 81308). 
(kafka.log.LogCleaner)
[2017-10-24 11:08:44,163] INFO Cleaner 0: Offset map for log 
events.lg.aggregated-0 complete. (kafka.log.LogCleaner)
[2017-10-24 11:08:44,165] INFO Cleaner 0: Cleaning log 
events.lg.aggregated-0 (cleaning prior to Tue Oct 24 11:08:30 UTC 2017, 
discarding tombstones prior to Thu Jan 01 00:00:00 UTC 1970)... 
(kafka.log.LogCleaner)
[2017-10-24 11:08:44,166] INFO Cleaner 0: Cleaning segment 0 in log 
events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:08:30 UTC 2017) 
into 0, retaining deletes. (kafka.log.LogCleaner)
[2017-10-24 11:08:47,865] INFO Cleaner 0: Swapping in cleaned segment 0 
for segment(s) 0 in log events.lg.aggregated-0. (kafka.log.LogCleaner)
Log cleaner thread 0 cleaned log events.lg.aggregated-0 (dirty 
section = [0, 0])
[2017-10-24 11:10:47,875] INFO Cleaner 0: Beginning cleaning of log 
events.lg.aggregated-0. (kafka.log.LogCleaner)
[2017-10-24 11:10:47,875] INFO Cleaner 0: Building offset map for 
events.lg.aggregated-0... (kafka.log.LogCleaner)
[2017-10-24 11:10:47,910] INFO Cleaner 0: Building offset map for log 
events.lg.aggregated-0 for 1 segments in offset range [81308, 154902). 
(kafka.log.LogCleaner)
[2017-10-24 11:10:48,410] INFO Cleaner 0: Offset map for log 
events.lg.aggregated-0 complete. (kafka.log.LogCleaner)
[2017-10-24 11:10:48,411] INFO Cleaner 0: Cleaning log 
events.lg.aggregated-0 (cleaning prior to Tue Oct 24 11:10:32 UTC 2017, 
discarding tombstones prior to Mon Oct 23 11:08:30 UTC 2017)... 
(kafka.log.LogCleaner)
[2017-10-24 11:10:48,411] INFO Cleaner 0: Cleaning segment 0 in log 
events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:08:30 UTC 2017) 
into 0, retaining deletes. (kafka.log.LogCleaner)
[2017-10-24 11:10:50,308] INFO Cleaner 0: Swapping in cleaned segment 0 
for segment(s) 0 in log events.lg.aggregated-0. (kafka.log.LogCleaner)
[2017-10-24 11:10:50,309] INFO Cleaner 0: Cleaning segment 81308 in log 
events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:10:32 UTC 2017) 
into 81308, retaining deletes. (kafka.log.LogCleaner)
[2017-10-24 11:10:53,389] INFO Cleaner 0: Swapping in cleaned segment 
81308 for segment(s) 81308 in log events.lg.aggregated-0. 
(kafka.log.LogCleaner)
Log cleaner thread 0 cleaned log events.lg.aggregated-0 (dirty 
section = [81308, 81308])







Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Manikumar
any errors in log cleaner logs?

On Wed, Oct 25, 2017 at 3:12 PM, Elmar Weber  wrote:

> Hello,
>
> I'm having trouble getting Kafka to compact a topic. It's over 300GB and
> has enough segments to warrant cleaning. It should only be about 40 GB
> (there is a copy in a db that is unique on the key). Below are the configs
> we have (default broker) and topic override.
>
>
> Is there something I'm missing on which setting is overriding which one or
> something still wrongly?
>
> retention.ms and delete.retentions.ms I set manually after creation on
> the topic and some segments have been created already.
>
> Kafka version 0.11
>
> Server Defaults for new segments of the topic:
>
> The settings used when a new log was created for the topic:
>
> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
> file.delete.delay.ms -> 6, max.message.bytes -> 2097152,
> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
> unclean.leader.election.enable -> false, retention.bytes -> -1,
> delete.retention.ms -> 8640, cleanup.policy -> compact, flush.ms ->
> 9223372036854775807, segment.ms -> 60480, segment.bytes ->
> 1073741824, retention.ms -> -1, message.timestamp.difference.max.ms ->
> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
> 9223372036854775807}
>
> Topic Overrides (overridden after creation).
>
> {retention.ms=360, delete.retention.ms=360,
> max.message.bytes=10485760, cleanup.policy=compact}
>
>
>
> The full server startup config:
>
> advertised.host.name = null
> advertised.listeners = null
> advertised.port = null
> alter.config.policy.class.name = null
> authorizer.class.name =
> auto.create.topics.enable = false
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 1
> broker.id.generation.enable = true
> broker.rack = europe-west1-c
> compression.type = producer
> connections.max.idle.ms = 60
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 1
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 0
> group.max.session.timeout.ms = 30
> group.min.session.timeout.ms = 6000
> host.name =
> inter.broker.listener.name = null
> inter.broker.protocol.version = 0.11.0-IV2
> leader.imbalance.check.interval.seconds = 300
> leader.imbalance.per.broker.percentage = 10
> listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PL
> AINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT
> listeners = null
> log.cleaner.backoff.ms = 15000
> log.cleaner.dedupe.buffer.size = 134217728
> log.cleaner.delete.retention.ms = 8640
> log.cleaner.enable = true
> log.cleaner.io.buffer.load.factor = 0.9
> log.cleaner.io.buffer.size = 524288
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> log.cleaner.min.cleanable.ratio = 0.5
> log.cleaner.min.compaction.lag.ms = 0
> log.cleaner.threads = 1
> log.cleanup.policy = [delete]
> log.dir = /tmp/kafka-logs
> log.dirs = /var/lib/kafka/data/topics
> log.flush.interval.messages = 9223372036854775807
> log.flush.interval.ms = null
> log.flush.offset.checkpoint.interval.ms = 6
> log.flush.scheduler.interval.ms = 9223372036854775807
> log.flush.start.offset.checkpoint.interval.ms = 6
> log.index.interval.bytes = 4096
> log.index.size.max.bytes = 10485760
> log.message.format.version = 0.11.0-IV2
> log.message.timestamp.difference.max.ms = 9223372036854775807
> log.message.timestamp.type = CreateTime
> log.preallocate = false
> log.retention.bytes = -1
> log.retention.check.interval.ms = 30
> log.retention.hours = -1
> log.retention.minutes = null
> log.retention.ms = null
> log.roll.hours = 168
> log.roll.jitter.hours = 0
> log.roll.jitter.ms = null
> log.roll.ms = null
> log.segment.bytes = 1073741824
> log.segment.delete.delay.ms = 6
> max.connections.per.ip = 2147483647
> max.connections.per.ip.overrides =
> message.max.bytes = 2097152
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> min.insync.replicas = 1
> num.io.threads = 8
> num.network.threads = 3
> num.partitions = 1
> num.recovery.threads.per.data.dir = 1
> num.replica.fetchers = 1
> offset.metadata.max.bytes = 4096
> offsets.commit.required.acks = -1
> offsets.commit.timeout.ms = 5000
> offsets.load.buffer.size = 5242880
> offsets.retention.check.interval.ms = 60
> offsets.retention.minutes = 1440
> offsets.topic.compression.codec = 0
> offsets.topic.num.partitions = 50
> offsets.topic.replication.factor = 1
> offsets.topic.segment.bytes = 104857600
> port = 9092
> principal.builder.c

Log Compaction Not Picking up Topic

2017-10-25 Thread Elmar Weber

Hello,

I'm having trouble getting Kafka to compact a topic. It's over 300GB and 
has enough segments to warrant cleaning. It should only be about 40 GB 
(there is a copy in a db that is unique on the key). Below are the 
configs we have (default broker) and topic override.



Is there something I'm missing on which setting is overriding which one 
or something still wrongly?


retention.ms and delete.retentions.ms I set manually after creation on 
the topic and some segments have been created already.


Kafka version 0.11

Server Defaults for new segments of the topic:

The settings used when a new log was created for the topic:

{compression.type -> producer, message.format.version -> 0.11.0-IV2, 
file.delete.delay.ms -> 6, max.message.bytes -> 2097152, 
min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, 
min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, 
min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, 
unclean.leader.election.enable -> false, retention.bytes -> -1, 
delete.retention.ms -> 8640, cleanup.policy -> compact, flush.ms -> 
9223372036854775807, segment.ms -> 60480, segment.bytes -> 
1073741824, retention.ms -> -1, message.timestamp.difference.max.ms -> 
9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 
9223372036854775807}


Topic Overrides (overridden after creation).

{retention.ms=360, delete.retention.ms=360, 
max.message.bytes=10485760, cleanup.policy=compact}




The full server startup config:

advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = false
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 1
broker.id.generation.enable = true
broker.rack = europe-west1-c
compression.type = producer
connections.max.idle.ms = 60
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 3
create.topic.policy.class.name = null
default.replication.factor = 1
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 30
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 0.11.0-IV2
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = 
SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT

listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 8640
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /var/lib/kafka/data/topics
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 6
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 6
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 0.11.0-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 30
log.retention.hours = -1
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 6
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 2097152
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 60
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

producer.purgatory.purge.interval.requests = 1000
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.m

Re: The idea of "composite key" to make log compaction more flexible - question / proposal

2017-10-09 Thread Michal Michalski
Hey Jay,

Thanks for reply. Yes, this should do the job.

We were thinking about something that's abstracting away this logic from
user (e.g. the same way Cassandra handles its PK definitions in CQL -
"hiding" the row key and optional clustering key behind the concept of
"primary key"), but introducing such design obviously has some pros/cons
and non-trivial implications, so if using the partitioner interface is the
way to go in Kafka - we'll use it :-)

Thanks,
Michał


On 5 October 2017 at 15:22, Jay Kreps  wrote:

> I think you can do this now by using a custom partitioner, no?
>
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/
> clients/producer/Partitioner.html
>
> -Jay
>
> On Mon, Oct 2, 2017 at 6:29 AM Michal Michalski <
> michal.michal...@zalando.ie>
> wrote:
>
> > Hi,
> >
> > TL;DR: I'd love to be able to make log compaction more "granular" than
> just
> > per-partition-key, so I was thinking about the concept of a "composite
> > key", where partitioning logic is using one part of the key, while
> > compaction uses the whole key - is this something desirable / doable /
> > worth a KIP?
> >
> > Longer story / use case:
> >
> > I'm currently a member of a team working on a project that's using a
> bunch
> > of applications to ingest data to the system (one "entity type" per app).
> > Once ingested by each application, since the entities are referring to
> each
> > other, they're all published to a single topic to ensure ordering for
> later
> > processing stages. Because of the nature of the data, for a given set of
> > entities related together, there's always a single "master" / parent"
> > entity, which ID we're using as the partition key; to give an example:
> > let's say you have "product" entity which can have things like "media",
> > "reviews", "stocks" etc. associated with it - product ID will be the
> > partition key for *all* these entities. However, with this approach we
> > simply cannot use log compaction because having e.g. "product", "media"
> and
> > "review" events, all with the same partition key "X", means that
> compaction
> > process will at some point delete all but one of them, causing a data
> loss
> > - only a single entity with key "X" will remain (and that's absolutely
> > correct - Kafka doesn't "understand" what does the message contain).
> >
> > We were thinking about introducing something we internally called
> > "composite key". The idea is to have a key that's not just a single
> String
> > K, but a pair of Strings: (K1, K2). For specifying the partition that the
> > message should be sent to, K1 would be used; however, for log compaction
> > purposes, the whole (K1, K2) would be used instead. This way, referring
> to
> > the example above, different entities "belonging" to the same "master
> > entity" (product), could be published to that topic with composite keys:
> > (productId, "product"), (productId, "media") and (productId, "review"),
> so
> > they all end up in single partition (specified by K1, which is always:
> > productId), but they won't get compacted together, because the K2 part is
> > different for them, making the whole "composite key" (K1, K2) different.
> Of
> > course K2 would be optional, so for someone who only needs the default
> > behaviour nothing would change.
> >
> > Since I'm not a Kafka developer and I don't know its internals that
> well, I
> > can't say if this idea is technically feasible or not, but I'd think it
> is
> > - I'd be more afraid of the complexity around backwards compatibility
> etc.
> > and potential performance implications of such change.
> >
> > I know that similar behaviour is achievable by using the producer API
> that
> > allows explicitly specifying the partition ID (and the key), but I think
> > it's a bit "clunky" (for each message, generate a key that this message
> > should normally be using [productId] and somehow "map" that key into a
> > partition X; then send that message to this partition X, *but* use the
> > "compaction" key instead [productId, entity type] as the message key) and
> > it's something that could be abstracted away from the user.
> >
> > Thoughts?
> >
> > Question to Kafka users: Is this something that anyone here would find
> > useful? Is anyone here dealing with similar problem?
> >
> > Question to Kafka maintainers: Is this something that you could
> potentially
> > consider a useful feature? Would it be worth a KIP? Is something like
> this
> > (technically) doable at all?
> >
> > --
> > Kind regards,
> > Michał Michalski
> >
>


Re: The idea of "composite key" to make log compaction more flexible - question / proposal

2017-10-05 Thread Jay Kreps
I think you can do this now by using a custom partitioner, no?

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/Partitioner.html

-Jay

On Mon, Oct 2, 2017 at 6:29 AM Michal Michalski 
wrote:

> Hi,
>
> TL;DR: I'd love to be able to make log compaction more "granular" than just
> per-partition-key, so I was thinking about the concept of a "composite
> key", where partitioning logic is using one part of the key, while
> compaction uses the whole key - is this something desirable / doable /
> worth a KIP?
>
> Longer story / use case:
>
> I'm currently a member of a team working on a project that's using a bunch
> of applications to ingest data to the system (one "entity type" per app).
> Once ingested by each application, since the entities are referring to each
> other, they're all published to a single topic to ensure ordering for later
> processing stages. Because of the nature of the data, for a given set of
> entities related together, there's always a single "master" / parent"
> entity, which ID we're using as the partition key; to give an example:
> let's say you have "product" entity which can have things like "media",
> "reviews", "stocks" etc. associated with it - product ID will be the
> partition key for *all* these entities. However, with this approach we
> simply cannot use log compaction because having e.g. "product", "media" and
> "review" events, all with the same partition key "X", means that compaction
> process will at some point delete all but one of them, causing a data loss
> - only a single entity with key "X" will remain (and that's absolutely
> correct - Kafka doesn't "understand" what does the message contain).
>
> We were thinking about introducing something we internally called
> "composite key". The idea is to have a key that's not just a single String
> K, but a pair of Strings: (K1, K2). For specifying the partition that the
> message should be sent to, K1 would be used; however, for log compaction
> purposes, the whole (K1, K2) would be used instead. This way, referring to
> the example above, different entities "belonging" to the same "master
> entity" (product), could be published to that topic with composite keys:
> (productId, "product"), (productId, "media") and (productId, "review"), so
> they all end up in single partition (specified by K1, which is always:
> productId), but they won't get compacted together, because the K2 part is
> different for them, making the whole "composite key" (K1, K2) different. Of
> course K2 would be optional, so for someone who only needs the default
> behaviour nothing would change.
>
> Since I'm not a Kafka developer and I don't know its internals that well, I
> can't say if this idea is technically feasible or not, but I'd think it is
> - I'd be more afraid of the complexity around backwards compatibility etc.
> and potential performance implications of such change.
>
> I know that similar behaviour is achievable by using the producer API that
> allows explicitly specifying the partition ID (and the key), but I think
> it's a bit "clunky" (for each message, generate a key that this message
> should normally be using [productId] and somehow "map" that key into a
> partition X; then send that message to this partition X, *but* use the
> "compaction" key instead [productId, entity type] as the message key) and
> it's something that could be abstracted away from the user.
>
> Thoughts?
>
> Question to Kafka users: Is this something that anyone here would find
> useful? Is anyone here dealing with similar problem?
>
> Question to Kafka maintainers: Is this something that you could potentially
> consider a useful feature? Would it be worth a KIP? Is something like this
> (technically) doable at all?
>
> --
> Kind regards,
> Michał Michalski
>


The idea of "composite key" to make log compaction more flexible - question / proposal

2017-10-02 Thread Michal Michalski
Hi,

TL;DR: I'd love to be able to make log compaction more "granular" than just
per-partition-key, so I was thinking about the concept of a "composite
key", where partitioning logic is using one part of the key, while
compaction uses the whole key - is this something desirable / doable /
worth a KIP?

Longer story / use case:

I'm currently a member of a team working on a project that's using a bunch
of applications to ingest data to the system (one "entity type" per app).
Once ingested by each application, since the entities are referring to each
other, they're all published to a single topic to ensure ordering for later
processing stages. Because of the nature of the data, for a given set of
entities related together, there's always a single "master" / parent"
entity, which ID we're using as the partition key; to give an example:
let's say you have "product" entity which can have things like "media",
"reviews", "stocks" etc. associated with it - product ID will be the
partition key for *all* these entities. However, with this approach we
simply cannot use log compaction because having e.g. "product", "media" and
"review" events, all with the same partition key "X", means that compaction
process will at some point delete all but one of them, causing a data loss
- only a single entity with key "X" will remain (and that's absolutely
correct - Kafka doesn't "understand" what does the message contain).

We were thinking about introducing something we internally called
"composite key". The idea is to have a key that's not just a single String
K, but a pair of Strings: (K1, K2). For specifying the partition that the
message should be sent to, K1 would be used; however, for log compaction
purposes, the whole (K1, K2) would be used instead. This way, referring to
the example above, different entities "belonging" to the same "master
entity" (product), could be published to that topic with composite keys:
(productId, "product"), (productId, "media") and (productId, "review"), so
they all end up in single partition (specified by K1, which is always:
productId), but they won't get compacted together, because the K2 part is
different for them, making the whole "composite key" (K1, K2) different. Of
course K2 would be optional, so for someone who only needs the default
behaviour nothing would change.

Since I'm not a Kafka developer and I don't know its internals that well, I
can't say if this idea is technically feasible or not, but I'd think it is
- I'd be more afraid of the complexity around backwards compatibility etc.
and potential performance implications of such change.

I know that similar behaviour is achievable by using the producer API that
allows explicitly specifying the partition ID (and the key), but I think
it's a bit "clunky" (for each message, generate a key that this message
should normally be using [productId] and somehow "map" that key into a
partition X; then send that message to this partition X, *but* use the
"compaction" key instead [productId, entity type] as the message key) and
it's something that could be abstracted away from the user.

Thoughts?

Question to Kafka users: Is this something that anyone here would find
useful? Is anyone here dealing with similar problem?

Question to Kafka maintainers: Is this something that you could potentially
consider a useful feature? Would it be worth a KIP? Is something like this
(technically) doable at all?

--
Kind regards,
Michał Michalski


Re: Log compaction failed because offset map doesn't have enough space

2017-05-17 Thread Tom Crayford
Hi,

You should upgrade Kafka versions, this was a bug fixed in KAFKA-3894:
https://issues.apache.org/jira/browse/KAFKA-3894

Generally it's a very good idea to keep on top of Kafka version upgrades,
there are numerous bugs fixed with every release, and it's stability goes
up each time.

On Tue, May 16, 2017 at 11:20 PM, Jun Ma  wrote:

> Hi team,
>
> We are having a issue with compacting __consumer_offsets topic in our
> cluster. We’re seeing logs in log-cleaner.log saying:
>
> [2017-05-16 11:56:28,993] INFO Cleaner 0: Building offset map for log
> __consumer_offsets-15 for 349 segments in offset range [0, 619265471).
> (kafka.log.LogCleaner)
> [2017-05-16 11:56:29,014] ERROR [kafka-log-cleaner-thread-0], Error due to
>  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 306088059 messages
> in segment __consumer_offsets-15/.log but offset map
> can fit only 7499. You can increase log.cleaner.dedupe.buffer.size or
> decrease log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at
> scala.collection.immutable.Stream$StreamWithFilter.
> foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-16 11:56:29,016] INFO [kafka-log-cleaner-thread-0], Stopped
>  (kafka.log.LogCleaner)
>
> We have log.cleaner.dedupe.buffer.size=20, which is slightly less
> than 2G, but still, it can only fit 74,999,999 messages. The segment has
> 306,088,059 messages which is 4 times larger than the buffer can hold. We
> tried to set log.cleaner.dedupe.buffer.size even larger, but we see the log
> saying that
> [2017-05-16 11:52:16,238] WARN [kafka-log-cleaner-thread-0], Cannot use
> more than 2G of cleaner buffer space per cleaner thread, ignoring excess
> buffer space... (kafka.log.LogCleaner)
>
> The size of .log segment is 100MB, and
> log.cleaner.threads=1. We’re running Kafka 0.9.0.1.
> How can we get through this?
>
> Thanks,
> Jun
>


Log compaction failed because offset map doesn't have enough space

2017-05-16 Thread Jun Ma
Hi team,

We are having a issue with compacting __consumer_offsets topic in our
cluster. We’re seeing logs in log-cleaner.log saying:

[2017-05-16 11:56:28,993] INFO Cleaner 0: Building offset map for log
__consumer_offsets-15 for 349 segments in offset range [0, 619265471).
(kafka.log.LogCleaner)
[2017-05-16 11:56:29,014] ERROR [kafka-log-cleaner-thread-0], Error due to
 (kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 306088059 messages
in segment __consumer_offsets-15/.log but offset map
can fit only 7499. You can increase log.cleaner.dedupe.buffer.size or
decrease log.cleaner.threads
at scala.Predef$.require(Predef.scala:219)
at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
at
scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
at kafka.log.Cleaner.clean(LogCleaner.scala:322)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-05-16 11:56:29,016] INFO [kafka-log-cleaner-thread-0], Stopped
 (kafka.log.LogCleaner)

We have log.cleaner.dedupe.buffer.size=20, which is slightly less
than 2G, but still, it can only fit 74,999,999 messages. The segment has
306,088,059 messages which is 4 times larger than the buffer can hold. We
tried to set log.cleaner.dedupe.buffer.size even larger, but we see the log
saying that
[2017-05-16 11:52:16,238] WARN [kafka-log-cleaner-thread-0], Cannot use
more than 2G of cleaner buffer space per cleaner thread, ignoring excess
buffer space... (kafka.log.LogCleaner)

The size of .log segment is 100MB, and
log.cleaner.threads=1. We’re running Kafka 0.9.0.1.
How can we get through this?

Thanks,
Jun


Re: using kafka log compaction withour key

2017-01-30 Thread Ewen Cheslack-Postava
The log compaction functionality uses the key to determine which records to
deduplicate. You can think of it (very roughly) as deleting entries from a
hash map as the value for each key is overwritten. This functionality
doesn't have much of a point unless you include keys in your records.

-Ewen

On Thu, Jan 26, 2017 at 5:28 AM, Samy CHBINOU 
wrote:

> Hello,
>
> Is is possible to use log compaction without key? I think in that case
> buffer will contain only one line of data value? Is that correct?
>
> thanks
>
>


using kafka log compaction withour key

2017-01-26 Thread Samy CHBINOU

Hello,

Is is possible to use log compaction without key? I think in that case 
buffer will contain only one line of data value? Is that correct?


thanks



Re: log compaction

2016-11-04 Thread Francesco laTorre
Hi Becket,

I can confirm :

1. All messages are keyed
2. Generous memory settings already in place
3. The topic is not compacted
4. Luckily we haven't experienced this issue. We are testing 0.10.0 and
hope to make the jump shortly.

Thanks you very much for the support, very much appreciated :)

Cheers,
Francesco

On 4 November 2016 at 01:38, Becket Qin  wrote:

> Hi Francesco,
>
> There are a few things to think about before turning on log compaction for
> a topic.
>
> 1. Does the topic have non-keyed message? Log compaction only works if all
> the messages have a key.
> 2. The log cleaner needs some memory to build the offset map for log
> compaction, so the memory consumption may be higher.
>
> Given you are still running on 0.8, there are a few additional things to be
> aware of:
> 3. Log compaction doesn't work with compressed topics until Kafka 0.9.0.
> 4. Some other potential issues that may caused by log compaction. (e.g.
> KAFKA-2024).
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Nov 2, 2016 at 5:55 AM, Francesco laTorre <
> francesco.lato...@openbet.com> wrote:
>
> > Hi,
> >
> > We want to enable log compaction on an existing topic (in production).
> > Is it a safe operation or there are things to take into consideration ?
> >
> > Kafka version 0.8
> >
> > Cheers,
> > Francesco
> >
> > --
> > <http://www.openbet.com/> Francesco laTorre
> > Senior Developer
> > T: +44 208 742 1600
> > +44 203 249 8394
> >
> > E: francesco.lato...@openbet.com
> > W: www.openbet.com
> > OpenBet Ltd
> > Chiswick Park Building 9
> > 566 Chiswick High Rd
> > London
> > W4 5XT
> > <https://www.openbet.com/email_promo>
> > This message is confidential and intended only for the addressee. If you
> > have received this message in error, please immediately notify the
> > postmas...@openbet.com and delete it from your system as well as any
> > copies. The content of e-mails as well as traffic data may be monitored
> by
> > OpenBet for employment and security purposes. To protect the environment
> > please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> > Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> > United Kingdom. A company registered in England and Wales. Registered no.
> > 3134634. VAT no. GB927523612
> >
>



-- 
<http://www.openbet.com/> Francesco laTorre
Senior Developer
T: +44 208 742 1600
+44 203 249 8394

E: francesco.lato...@openbet.com
W: www.openbet.com
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com and delete it from your system as well as any
copies. The content of e-mails as well as traffic data may be monitored by
OpenBet for employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd. Registered
Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
United Kingdom. A company registered in England and Wales. Registered no.
3134634. VAT no. GB927523612


Re: log compaction

2016-11-03 Thread Becket Qin
Hi Francesco,

There are a few things to think about before turning on log compaction for
a topic.

1. Does the topic have non-keyed message? Log compaction only works if all
the messages have a key.
2. The log cleaner needs some memory to build the offset map for log
compaction, so the memory consumption may be higher.

Given you are still running on 0.8, there are a few additional things to be
aware of:
3. Log compaction doesn't work with compressed topics until Kafka 0.9.0.
4. Some other potential issues that may caused by log compaction. (e.g.
KAFKA-2024).

Jiangjie (Becket) Qin



On Wed, Nov 2, 2016 at 5:55 AM, Francesco laTorre <
francesco.lato...@openbet.com> wrote:

> Hi,
>
> We want to enable log compaction on an existing topic (in production).
> Is it a safe operation or there are things to take into consideration ?
>
> Kafka version 0.8
>
> Cheers,
> Francesco
>
> --
> <http://www.openbet.com/> Francesco laTorre
> Senior Developer
> T: +44 208 742 1600
> +44 203 249 8394
>
> E: francesco.lato...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
> Chiswick Park Building 9
> 566 Chiswick High Rd
> London
> W4 5XT
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>


log compaction

2016-11-02 Thread Francesco laTorre
Hi,

We want to enable log compaction on an existing topic (in production).
Is it a safe operation or there are things to take into consideration ?

Kafka version 0.8

Cheers,
Francesco

-- 
<http://www.openbet.com/> Francesco laTorre
Senior Developer
T: +44 208 742 1600
+44 203 249 8394

E: francesco.lato...@openbet.com
W: www.openbet.com
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com and delete it from your system as well as any
copies. The content of e-mails as well as traffic data may be monitored by
OpenBet for employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd. Registered
Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
United Kingdom. A company registered in England and Wales. Registered no.
3134634. VAT no. GB927523612


Re: Process to enable log compaction on a cluster

2016-10-12 Thread Mario Ricci
Sathya,

Did you ever figure out what to do here?

On Mon, Jul 4, 2016 at 12:19 AM Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> My another followup question is that Am I right in assuming that per topic
> retention minutes or clean up policy, they all have any effect only when
> you log.cleaner.enable-false ?
> So In other words, I choose to truncate topic data, then __consumer_offsets
> topic will also be either deleted or compacted ?
>
> On Mon, Jul 4, 2016 at 11:38 AM, Sathyakumar Seshachalam <
> sathyakumar_seshacha...@trimble.com> wrote:
>
> > Ok, Am in situation where all kafka nodes are going to run out of space.
> > This is because I had been running uncompacted __consumer_offset topic
> and
> > everything retained topics .
> >
> > Now at a place, where I can afford to compact __consumer_offset topic and
> > also delete certain topics. I would like to know the right process to do
> > this.
> >
> > Now since I am having close 1.8T of data on __consumer_offset topic and
> > more in the topics data, any log compaction and log deletion/trunction is
> > going to take time. Should I do this node by node. Will Kafka's
> replication
> > come in the way. (I have read that uncompacted data from the leader is
> sent
> > to the followers.)
> >
> > Is there a clean process for this for a 3 node Kafka cluster ? Last time
> I
> > triggered a log compaction in all the 3 node simultaneously, all
> consumers
> > broke (I raised this in the same email group and got an answer to
> increase
> > the memory). Eventually they self-healed, but this caused some serious
> > disruption to the service, so before trying I want to make sure, there
> is a
> > cleaner process here.
> >
> > Any help/pointers will be greatly appreciated.
> >
> > Thanks,
> > Sathya
> >
> >
> >
>
-- 

Mario Ricci | Software Engineer | Trimble Navigation Limited | VirtualSite
Solutions | Office: +1 (303) 635-8604 / x228604


Log Compaction: How Key comparison works?

2016-09-27 Thread Kamal C
Hi all,

The log compaction article [1] doesn't explains how key comparison
takes place. AFAIK, Kafka don't de-serialize the records and using MD5
algorithm. I'm using Kafka java client-v10. Could someone explain whether
the below statements are correct:

1. Key can be of any data-type ?
2. K1 == K1 returns true ?
3. K1.equals(K1) returns true ? (Even, if custom equals() and hashcode()
implemented)

[1]: https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction

- Kamal


Re: Verify log compaction

2016-08-11 Thread Harald Kirsch

In case you are on Windows: compaction plainly does not work.

See https://issues.apache.org/jira/browse/KAFKA-1194

Root cause seems to be Windows' file locking and Kafka trying to 
delete/rename files that are open in another thread/part of the broker.


Harald.

On 11.08.2016 07:38, David Yu wrote:

For some reason, I cannot find log-cleaner.log anywhere.

On Fri, Jul 29, 2016 at 8:20 PM John Holland <
john.holl...@objectpartners.com> wrote:


Check the log-cleaner.log file on the server.  When the thread runs you'll
see output for every partition it compacts and the compaction ratio it
achieved.

The __consumer_offsets topic is compacted, I see log output from it being
compacted frequently.

Depending on your settings for the topic it may take a while for it to
compact.  Compaction doesn't occur on the current log segment.  Look at
these settings for the topic, "segment.bytes" and "segment.ms".  Lower
them
to force quicker compaction.

On 0.8.2.2, occasionally, the compaction thread would die and then the
__consumer_offets topic would grow out of control.  Kafka logs the thread
death in the log-cleaner.log.


On Fri, Jul 29, 2016 at 4:10 PM David Yu  wrote:


Hi,

We are using Kafka 0.9.0.0. One of our topic is set to use log

compaction.

We have also set log.cleaner.enable. However, we suspected that the topic
is not being compacted.

What is the best way for us to verify the compaction is happening?

Thanks,
David







Re: Verify log compaction

2016-08-10 Thread David Yu
For some reason, I cannot find log-cleaner.log anywhere.

On Fri, Jul 29, 2016 at 8:20 PM John Holland <
john.holl...@objectpartners.com> wrote:

> Check the log-cleaner.log file on the server.  When the thread runs you'll
> see output for every partition it compacts and the compaction ratio it
> achieved.
>
> The __consumer_offsets topic is compacted, I see log output from it being
> compacted frequently.
>
> Depending on your settings for the topic it may take a while for it to
> compact.  Compaction doesn't occur on the current log segment.  Look at
> these settings for the topic, "segment.bytes" and "segment.ms".  Lower
> them
> to force quicker compaction.
>
> On 0.8.2.2, occasionally, the compaction thread would die and then the
> __consumer_offets topic would grow out of control.  Kafka logs the thread
> death in the log-cleaner.log.
>
>
> On Fri, Jul 29, 2016 at 4:10 PM David Yu  wrote:
>
> > Hi,
> >
> > We are using Kafka 0.9.0.0. One of our topic is set to use log
> compaction.
> > We have also set log.cleaner.enable. However, we suspected that the topic
> > is not being compacted.
> >
> > What is the best way for us to verify the compaction is happening?
> >
> > Thanks,
> > David
> >
>


Re: Log compaction leaves empty files?

2016-08-09 Thread Dustin Cote
I would say the zero-byte files should be cleaned up by the LogCleaner, but
on Windows, I'm not sure how any of this works at this point.  I'll let
someone with more experience in that area chime in.

On Mon, Aug 8, 2016 at 8:29 AM, Harald Kirsch 
wrote:

> Hi Dustin,
>
> thanks for the reply. to be honest, I am trying to work around
> https://issues.apache.org/jira/browse/KAFKA-1194.
>
> I implemented a small main() to call the LogCleaner for compaction and it
> seemed to work, but left the zero-byte files. The idea is to stop the
> broker, then run the compaction as a separate process, then restart the
> broker in order to get around this problem where the whole process trips
> over its own shoe laces due to Windows' file locking.
>
> Since I haven't seen the compaction working on Windows yet, I was
> wondering whether this is to be expected. Should the compaction remove the
> zero-byte files are would the broker do this? I'm not yet enough into the
> cleanup code to understand this.
>
> Regards,
> Harald
>
>
> On 05.08.2016 16:23, Dustin Cote wrote:
>
>> Harald,
>>
>> I note that your last modified times are all the same.  Are you maybe
>> using
>> Java 7?  There's some details here that a JDK bug in Java 7 causes the
>> last
>> modified time to get updated on broker restart:
>> https://issues.apache.org/jira/browse/KAFKA-3802
>>
>>
>>
>> On Fri, Aug 5, 2016 at 6:12 AM, Harald Kirsch 
>> wrote:
>>
>> Hi,
>>>
>>> experimenting with log compaction, I see Kafka go through all the steps,
>>> in particular I see positive messages in log-cleaner.log and *.deleted
>>> files. Yet once the *.deleted segment files have disappeared, the segment
>>> and index files with size 0 are still kept.
>>>
>>> I stopped and restarted Kafka and saw several rounds of the log cleaner
>>> go
>>> by, but the empty files stay:
>>>
>>> -rw-rw-r-- 1 0 Aug  5 11:58 .index
>>> -rw-rw-r-- 1 0 Aug  5 11:58 .log
>>> -rw-rw-r-- 1 0 Aug  5 11:58 0051.index
>>> -rw-rw-r-- 1 0 Aug  5 11:58 0051.log
>>> -rw-rw-r-- 1 0 Aug  5 11:58 0096.index
>>> -rw-rw-r-- 1 0 Aug  5 11:58 0096.log
>>> [... snip ...]
>>> -rw-rw-r-- 1 92041 Aug  5 11:58 0680.log
>>> -rw-rw-r-- 1 0 Aug  5 11:58 0727.index
>>> -rw-rw-r-- 1199822 Aug  5 11:58 0727.log
>>> -rw-rw-r-- 1  10485760 Aug  5 11:58 0781.index
>>> -rw-rw-r-- 1 95972 Aug  5 11:58 0781.log
>>>
>>> Is this expected behavior or is there yet another configuration option
>>> that defines when these get purged?
>>>
>>> Harald.
>>>
>>>
>>
>>
>>


-- 
*Dustin Cote*
Customer Operations Engineer | Confluent
Follow us: Twitter <https://twitter.com/ConfluentInc> | blog
<http://www.confluent.io/blog>


Re: Log compaction leaves empty files?

2016-08-08 Thread Harald Kirsch

Hi Dustin,

thanks for the reply. to be honest, I am trying to work around 
https://issues.apache.org/jira/browse/KAFKA-1194.


I implemented a small main() to call the LogCleaner for compaction and 
it seemed to work, but left the zero-byte files. The idea is to stop the 
broker, then run the compaction as a separate process, then restart the 
broker in order to get around this problem where the whole process trips 
over its own shoe laces due to Windows' file locking.


Since I haven't seen the compaction working on Windows yet, I was 
wondering whether this is to be expected. Should the compaction remove 
the zero-byte files are would the broker do this? I'm not yet enough 
into the cleanup code to understand this.


Regards,
Harald

On 05.08.2016 16:23, Dustin Cote wrote:

Harald,

I note that your last modified times are all the same.  Are you maybe using
Java 7?  There's some details here that a JDK bug in Java 7 causes the last
modified time to get updated on broker restart:
https://issues.apache.org/jira/browse/KAFKA-3802



On Fri, Aug 5, 2016 at 6:12 AM, Harald Kirsch 
wrote:


Hi,

experimenting with log compaction, I see Kafka go through all the steps,
in particular I see positive messages in log-cleaner.log and *.deleted
files. Yet once the *.deleted segment files have disappeared, the segment
and index files with size 0 are still kept.

I stopped and restarted Kafka and saw several rounds of the log cleaner go
by, but the empty files stay:

-rw-rw-r-- 1 0 Aug  5 11:58 .index
-rw-rw-r-- 1 0 Aug  5 11:58 .log
-rw-rw-r-- 1 0 Aug  5 11:58 0051.index
-rw-rw-r-- 1 0 Aug  5 11:58 0051.log
-rw-rw-r-- 1 0 Aug  5 11:58 0096.index
-rw-rw-r-- 1 0 Aug  5 11:58 0096.log
[... snip ...]
-rw-rw-r-- 1 92041 Aug  5 11:58 0680.log
-rw-rw-r-- 1 0 Aug  5 11:58 0727.index
-rw-rw-r-- 1199822 Aug  5 11:58 0727.log
-rw-rw-r-- 1  10485760 Aug  5 11:58 0781.index
-rw-rw-r-- 1 95972 Aug  5 11:58 0781.log

Is this expected behavior or is there yet another configuration option
that defines when these get purged?

Harald.







Re: Log compaction leaves empty files?

2016-08-05 Thread Dustin Cote
Harald,

I note that your last modified times are all the same.  Are you maybe using
Java 7?  There's some details here that a JDK bug in Java 7 causes the last
modified time to get updated on broker restart:
https://issues.apache.org/jira/browse/KAFKA-3802



On Fri, Aug 5, 2016 at 6:12 AM, Harald Kirsch 
wrote:

> Hi,
>
> experimenting with log compaction, I see Kafka go through all the steps,
> in particular I see positive messages in log-cleaner.log and *.deleted
> files. Yet once the *.deleted segment files have disappeared, the segment
> and index files with size 0 are still kept.
>
> I stopped and restarted Kafka and saw several rounds of the log cleaner go
> by, but the empty files stay:
>
> -rw-rw-r-- 1 0 Aug  5 11:58 .index
> -rw-rw-r-- 1 0 Aug  5 11:58 .log
> -rw-rw-r-- 1 0 Aug  5 11:58 0051.index
> -rw-rw-r-- 1 0 Aug  5 11:58 0051.log
> -rw-rw-r-- 1 0 Aug  5 11:58 0096.index
> -rw-rw-r-- 1 0 Aug  5 11:58 0096.log
> [... snip ...]
> -rw-rw-r-- 1 92041 Aug  5 11:58 0680.log
> -rw-rw-r-- 1 0 Aug  5 11:58 0727.index
> -rw-rw-r-- 1199822 Aug  5 11:58 0727.log
> -rw-rw-r-- 1  10485760 Aug  5 11:58 0781.index
> -rw-rw-r-- 1 95972 Aug  5 11:58 0781.log
>
> Is this expected behavior or is there yet another configuration option
> that defines when these get purged?
>
> Harald.
>



-- 
Dustin Cote
confluent.io


Log compaction leaves empty files?

2016-08-05 Thread Harald Kirsch

Hi,

experimenting with log compaction, I see Kafka go through all the steps, 
in particular I see positive messages in log-cleaner.log and *.deleted 
files. Yet once the *.deleted segment files have disappeared, the 
segment and index files with size 0 are still kept.


I stopped and restarted Kafka and saw several rounds of the log cleaner 
go by, but the empty files stay:


-rw-rw-r-- 1 0 Aug  5 11:58 .index
-rw-rw-r-- 1 0 Aug  5 11:58 .log
-rw-rw-r-- 1 0 Aug  5 11:58 0051.index
-rw-rw-r-- 1 0 Aug  5 11:58 0051.log
-rw-rw-r-- 1 0 Aug  5 11:58 0096.index
-rw-rw-r-- 1 0 Aug  5 11:58 0096.log
[... snip ...]
-rw-rw-r-- 1 92041 Aug  5 11:58 0680.log
-rw-rw-r-- 1 0 Aug  5 11:58 0727.index
-rw-rw-r-- 1199822 Aug  5 11:58 0727.log
-rw-rw-r-- 1  10485760 Aug  5 11:58 0781.index
-rw-rw-r-- 1 95972 Aug  5 11:58 0781.log

Is this expected behavior or is there yet another configuration option 
that defines when these get purged?


Harald.


Re: Verify log compaction

2016-07-29 Thread John Holland
Check the log-cleaner.log file on the server.  When the thread runs you'll
see output for every partition it compacts and the compaction ratio it
achieved.

The __consumer_offsets topic is compacted, I see log output from it being
compacted frequently.

Depending on your settings for the topic it may take a while for it to
compact.  Compaction doesn't occur on the current log segment.  Look at
these settings for the topic, "segment.bytes" and "segment.ms".  Lower them
to force quicker compaction.

On 0.8.2.2, occasionally, the compaction thread would die and then the
__consumer_offets topic would grow out of control.  Kafka logs the thread
death in the log-cleaner.log.


On Fri, Jul 29, 2016 at 4:10 PM David Yu  wrote:

> Hi,
>
> We are using Kafka 0.9.0.0. One of our topic is set to use log compaction.
> We have also set log.cleaner.enable. However, we suspected that the topic
> is not being compacted.
>
> What is the best way for us to verify the compaction is happening?
>
> Thanks,
> David
>


Verify log compaction

2016-07-29 Thread David Yu
Hi,

We are using Kafka 0.9.0.0. One of our topic is set to use log compaction.
We have also set log.cleaner.enable. However, we suspected that the topic
is not being compacted.

What is the best way for us to verify the compaction is happening?

Thanks,
David


Re: Process to enable log compaction on a cluster

2016-07-03 Thread Sathyakumar Seshachalam
My another followup question is that Am I right in assuming that per topic
retention minutes or clean up policy, they all have any effect only when
you log.cleaner.enable-false ?
So In other words, I choose to truncate topic data, then __consumer_offsets
topic will also be either deleted or compacted ?

On Mon, Jul 4, 2016 at 11:38 AM, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> Ok, Am in situation where all kafka nodes are going to run out of space.
> This is because I had been running uncompacted __consumer_offset topic and
> everything retained topics .
>
> Now at a place, where I can afford to compact __consumer_offset topic and
> also delete certain topics. I would like to know the right process to do
> this.
>
> Now since I am having close 1.8T of data on __consumer_offset topic and
> more in the topics data, any log compaction and log deletion/trunction is
> going to take time. Should I do this node by node. Will Kafka's replication
> come in the way. (I have read that uncompacted data from the leader is sent
> to the followers.)
>
> Is there a clean process for this for a 3 node Kafka cluster ? Last time I
> triggered a log compaction in all the 3 node simultaneously, all consumers
> broke (I raised this in the same email group and got an answer to increase
> the memory). Eventually they self-healed, but this caused some serious
> disruption to the service, so before trying I want to make sure, there is a
> cleaner process here.
>
> Any help/pointers will be greatly appreciated.
>
> Thanks,
> Sathya
>
>
>


Process to enable log compaction on a cluster

2016-07-03 Thread Sathyakumar Seshachalam
Ok, Am in situation where all kafka nodes are going to run out of space.
This is because I had been running uncompacted __consumer_offset topic and
everything retained topics .

Now at a place, where I can afford to compact __consumer_offset topic and
also delete certain topics. I would like to know the right process to do
this.

Now since I am having close 1.8T of data on __consumer_offset topic and
more in the topics data, any log compaction and log deletion/trunction is
going to take time. Should I do this node by node. Will Kafka's replication
come in the way. (I have read that uncompacted data from the leader is sent
to the followers.)

Is there a clean process for this for a 3 node Kafka cluster ? Last time I
triggered a log compaction in all the 3 node simultaneously, all consumers
broke (I raised this in the same email group and got an answer to increase
the memory). Eventually they self-healed, but this caused some serious
disruption to the service, so before trying I want to make sure, there is a
cleaner process here.

Any help/pointers will be greatly appreciated.

Thanks,
Sathya


Re: Log Compaction not Deleting on Kafka 0.9.0.1

2016-06-22 Thread Ismael Juma
By the way, https://issues.apache.org/jira/browse/KAFKA-3587 was fixed in
0.10.0.0.

Ismael

On Wed, Jun 22, 2016 at 7:28 PM, Tom Crayford  wrote:

> Is the log cleaner thread running? We've seen issues where the log cleaner
> thread dies after too much logged data. You'll see a message like this:
>
> [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in
> segment MY_FAVORITE_TOPIC_IS_SORBET-2/47580165.log but offset
> map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size
> or decrease log.cleaner.threads
>
> You can check if it's running by dumping threads using JMX and looking for
> the thread name containing `kafka-log-cleaner-thread`
>
> If this happens, there's not too much remediation you *can* do right now.
> One potential is (assuming significant replication and enough other cluster
> stability) is to delete the data on the broker and bring it up again, and
> ensure the log cleaner is turned on the whole time. *hopefully* compaction
> will keep up whilst kafka catches up with replication, but that's not
> guaranteed.
>
> We're going to be upstreaming a ticket shortly based on this and other
> issues we've seen with log compaction.
>
> On Wed, Jun 22, 2016 at 6:03 PM, Lawrence Weikum 
> wrote:
>
> > We seem to be having a strange issue with a cluster of ours; specifically
> > with the __consumer_offsets topic.
> >
> > When we brought the cluster online, log compaction was turned off.
> > Realizing our mistake, we turned it on, but only after the topic had over
> > 31,018,699,972 offsets committed to it.  Log compaction seems to have
> > worked and be working properly.  The logs are showing that every
> partition
> > has been compacted, and may pieces have been marked for deletion.
> >
> > The problem is that not all partitions are having their older logs
> > deleted.  Some partitions will grow to having 19 log files, but a few
> > seconds later will have only 13.  One partition in particular, though,
> > still has all of its log files, all 19,000 of them, and this never seems
> to
> > change, only grow as new offsets come in.
> >
> > Restarting that broker doesn’t seem to help.
> >
> >
> > We’ve checked the broker settings on everything as well.
> >
> > log.cleaner.enable = true
> > log.cleanup.policy = delete
> > cleanup.policy = compact
> >
> >
> > Has anyone encountered this issue before?
> >
> > Thank you all for the help!
> >
> > Lawrence Weikum
> >
> >
>


Re: Log Compaction not Deleting on Kafka 0.9.0.1

2016-06-22 Thread Tom Crayford
This smells like an bug to me.

On Wed, Jun 22, 2016 at 6:54 PM, Lawrence Weikum 
wrote:

> Fascinating.
>
> We are seeing no errors or warning in the logs after restart.  It appears
> on this broker that the compaction thread is working:
>
> [2016-06-22 10:33:49,179] INFO Rolled new log segment for
> '__consumer_offsets-28' in 1 ms. (kafka.log.Log)
> [2016-06-22 10:34:00,968] INFO Deleting segment 0 from log
> __consumer_offsets-28. (kafka.log.Log)
> [2016-06-22 10:34:00,970] INFO Deleting index
> /kafka/data/__consumer_offsets-28/.index.deleted
> (kafka.log.OffsetIndex)
> [2016-06-22 10:34:00,992] INFO Deleting segment 2148144095 from log
> __consumer_offsets-28. (kafka.log.Log)
> [2016-06-22 10:34:00,994] INFO Deleting index
> /kafka/data/__consumer_offsets-28/002148144095.index.deleted
> (kafka.log.OffsetIndex)
> [2016-06-22 10:34:01,002] INFO Deleting segment 3189277822 from log
> __consumer_offsets-28. (kafka.log.Log)
> [2016-06-22 10:34:01,004] INFO Deleting index
> /kafka/data/__consumer_offsets-28/003189277822.index.deleted
> (kafka.log.OffsetIndex)
> [2016-06-22 10:34:02,019] INFO Deleting segment 3190205744 from log
> __consumer_offsets-28. (kafka.log.Log)
> [2016-06-22 10:34:02,039] INFO Deleting index
> /kafka/data/__consumer_offsets-28/003190205744.index.deleted
> (kafka.log.OffsetIndex)
>
> We see the “kafka-log-cleaner-thread” in the JMX.  It seems to run about
> every 50 seconds.  From the log-cleaner.log file, we see plenty of this
> output regarding the partition that’s hogging the FDs:
>
> [2016-06-22 10:44:31,845] INFO Cleaner 0: Beginning cleaning of log
> __consumer_offsets-28. (kafka.log.LogCleaner)
> [2016-06-22 10:44:31,846] INFO Cleaner 0: Building offset map for
> __consumer_offsets-28... (kafka.log.LogCleaner)
> [2016-06-22 10:44:31,878] INFO Cleaner 0: Building offset map for log
> __consumer_offsets-28 for 1 segments in offset range [3204124461,
> 3205052375). (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,870] INFO Cleaner 0: Offset map for log
> __consumer_offsets-28 complete. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,871] INFO Cleaner 0: Cleaning log
> __consumer_offsets-28 (discarding tombstones prior to Tue Jun 21 10:43:19
> PDT 2016)... (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,871] INFO Cleaner 0: Cleaning segment 0 in log
> __consumer_offsets-28 (last modified Tue Jun 21 22:39:18 PDT 2016) into 0,
> retaining deletes. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,888] INFO Cleaner 0: Swapping in cleaned segment 0
> for segment(s) 0 in log __consumer_offsets-28. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,889] INFO Cleaner 0: Cleaning segment 2148144095 in
> log __consumer_offsets-28 (last modified Wed Jun 22 10:42:31 PDT 2016) into
> 2148144095, retaining deletes. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,889] INFO Cleaner 0: Cleaning segment 3203196540 in
> log __consumer_offsets-28 (last modified Wed Jun 22 10:43:19 PDT 2016) into
> 2148144095, retaining deletes. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,905] INFO Cleaner 0: Swapping in cleaned segment
> 2148144095 for segment(s) 2148144095,3203196540 in log
> __consumer_offsets-28. (kafka.log.LogCleaner)
> [2016-06-22 10:44:32,905] INFO Cleaner 0: Cleaning segment 3204124461 in
> log __consumer_offsets-28 (last modified Wed Jun 22 10:44:21 PDT 2016) into
> 3204124461, retaining deletes. (kafka.log.LogCleaner)
> [2016-06-22 10:44:33,834] INFO Cleaner 0: Swapping in cleaned segment
> 3204124461 for segment(s) 3204124461 in log __consumer_offsets-28.
> (kafka.log.LogCleaner)
> [2016-06-22 10:44:33,836] INFO [kafka-log-cleaner-thread-0],
> Log cleaner thread 0 cleaned log __consumer_offsets-28 (dirty
> section = [3204124461, 3205052375])
> 100.0 MB of log processed in 2.0 seconds (50.3 MB/sec).
> Indexed 100.0 MB in 1.0 seconds (97.6 Mb/sec, 51.5% of total time)
> Buffer utilization: 0.0%
> Cleaned 100.0 MB in 1.0 seconds (103.6 Mb/sec, 48.5% of total time)
> Start size: 100.0 MB (928,011 messages)
> End size: 0.0 MB (97 messages)
> 100.0% size reduction (100.0% fewer messages)
>  (kafka.log.LogCleaner)
>
> But no actual delete messages like a properly-working broker is showing of
> a different partition.
>
> Lawrence Weikum
>
>
> On 6/22/16, 11:28 AM, "Tom Crayford"  wrote:
>
> kafka-log-cleaner-thread-0
>
>


Re: Log Compaction not Deleting on Kafka 0.9.0.1

2016-06-22 Thread Lawrence Weikum
Fascinating.  

We are seeing no errors or warning in the logs after restart.  It appears on 
this broker that the compaction thread is working:

[2016-06-22 10:33:49,179] INFO Rolled new log segment for 
'__consumer_offsets-28' in 1 ms. (kafka.log.Log)
[2016-06-22 10:34:00,968] INFO Deleting segment 0 from log 
__consumer_offsets-28. (kafka.log.Log)
[2016-06-22 10:34:00,970] INFO Deleting index 
/kafka/data/__consumer_offsets-28/.index.deleted 
(kafka.log.OffsetIndex)
[2016-06-22 10:34:00,992] INFO Deleting segment 2148144095 from log 
__consumer_offsets-28. (kafka.log.Log)
[2016-06-22 10:34:00,994] INFO Deleting index 
/kafka/data/__consumer_offsets-28/002148144095.index.deleted 
(kafka.log.OffsetIndex)
[2016-06-22 10:34:01,002] INFO Deleting segment 3189277822 from log 
__consumer_offsets-28. (kafka.log.Log)
[2016-06-22 10:34:01,004] INFO Deleting index 
/kafka/data/__consumer_offsets-28/003189277822.index.deleted 
(kafka.log.OffsetIndex)
[2016-06-22 10:34:02,019] INFO Deleting segment 3190205744 from log 
__consumer_offsets-28. (kafka.log.Log)
[2016-06-22 10:34:02,039] INFO Deleting index 
/kafka/data/__consumer_offsets-28/003190205744.index.deleted 
(kafka.log.OffsetIndex)

We see the “kafka-log-cleaner-thread” in the JMX.  It seems to run about every 
50 seconds.  From the log-cleaner.log file, we see plenty of this  output 
regarding the partition that’s hogging the FDs:

[2016-06-22 10:44:31,845] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-28. (kafka.log.LogCleaner)
[2016-06-22 10:44:31,846] INFO Cleaner 0: Building offset map for 
__consumer_offsets-28... (kafka.log.LogCleaner)
[2016-06-22 10:44:31,878] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-28 for 1 segments in offset range [3204124461, 3205052375). 
(kafka.log.LogCleaner)
[2016-06-22 10:44:32,870] INFO Cleaner 0: Offset map for log 
__consumer_offsets-28 complete. (kafka.log.LogCleaner)
[2016-06-22 10:44:32,871] INFO Cleaner 0: Cleaning log __consumer_offsets-28 
(discarding tombstones prior to Tue Jun 21 10:43:19 PDT 2016)... 
(kafka.log.LogCleaner)
[2016-06-22 10:44:32,871] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-28 (last modified Tue Jun 21 22:39:18 PDT 2016) into 0, 
retaining deletes. (kafka.log.LogCleaner)
[2016-06-22 10:44:32,888] INFO Cleaner 0: Swapping in cleaned segment 0 for 
segment(s) 0 in log __consumer_offsets-28. (kafka.log.LogCleaner)
[2016-06-22 10:44:32,889] INFO Cleaner 0: Cleaning segment 2148144095 in log 
__consumer_offsets-28 (last modified Wed Jun 22 10:42:31 PDT 2016) into 
2148144095, retaining deletes. (kafka.log.LogCleaner)
[2016-06-22 10:44:32,889] INFO Cleaner 0: Cleaning segment 3203196540 in log 
__consumer_offsets-28 (last modified Wed Jun 22 10:43:19 PDT 2016) into 
2148144095, retaining deletes. (kafka.log.LogCleaner)
[2016-06-22 10:44:32,905] INFO Cleaner 0: Swapping in cleaned segment 
2148144095 for segment(s) 2148144095,3203196540 in log __consumer_offsets-28. 
(kafka.log.LogCleaner)
[2016-06-22 10:44:32,905] INFO Cleaner 0: Cleaning segment 3204124461 in log 
__consumer_offsets-28 (last modified Wed Jun 22 10:44:21 PDT 2016) into 
3204124461, retaining deletes. (kafka.log.LogCleaner)
[2016-06-22 10:44:33,834] INFO Cleaner 0: Swapping in cleaned segment 
3204124461 for segment(s) 3204124461 in log __consumer_offsets-28. 
(kafka.log.LogCleaner)
[2016-06-22 10:44:33,836] INFO [kafka-log-cleaner-thread-0],
Log cleaner thread 0 cleaned log __consumer_offsets-28 (dirty section = 
[3204124461, 3205052375])
100.0 MB of log processed in 2.0 seconds (50.3 MB/sec).
Indexed 100.0 MB in 1.0 seconds (97.6 Mb/sec, 51.5% of total time)
Buffer utilization: 0.0%
Cleaned 100.0 MB in 1.0 seconds (103.6 Mb/sec, 48.5% of total time)
Start size: 100.0 MB (928,011 messages)
End size: 0.0 MB (97 messages)
100.0% size reduction (100.0% fewer messages)
 (kafka.log.LogCleaner)

But no actual delete messages like a properly-working broker is showing of a 
different partition. 

Lawrence Weikum 


On 6/22/16, 11:28 AM, "Tom Crayford"  wrote:

kafka-log-cleaner-thread-0



Re: Log Compaction not Deleting on Kafka 0.9.0.1

2016-06-22 Thread Tom Crayford
Is the log cleaner thread running? We've seen issues where the log cleaner
thread dies after too much logged data. You'll see a message like this:

[kafka-log-cleaner-thread-0], Error due to
java.lang.IllegalArgumentException: requirement failed: 9750860 messages in
segment MY_FAVORITE_TOPIC_IS_SORBET-2/47580165.log but offset
map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size
or decrease log.cleaner.threads

You can check if it's running by dumping threads using JMX and looking for
the thread name containing `kafka-log-cleaner-thread`

If this happens, there's not too much remediation you *can* do right now.
One potential is (assuming significant replication and enough other cluster
stability) is to delete the data on the broker and bring it up again, and
ensure the log cleaner is turned on the whole time. *hopefully* compaction
will keep up whilst kafka catches up with replication, but that's not
guaranteed.

We're going to be upstreaming a ticket shortly based on this and other
issues we've seen with log compaction.

On Wed, Jun 22, 2016 at 6:03 PM, Lawrence Weikum 
wrote:

> We seem to be having a strange issue with a cluster of ours; specifically
> with the __consumer_offsets topic.
>
> When we brought the cluster online, log compaction was turned off.
> Realizing our mistake, we turned it on, but only after the topic had over
> 31,018,699,972 offsets committed to it.  Log compaction seems to have
> worked and be working properly.  The logs are showing that every partition
> has been compacted, and may pieces have been marked for deletion.
>
> The problem is that not all partitions are having their older logs
> deleted.  Some partitions will grow to having 19 log files, but a few
> seconds later will have only 13.  One partition in particular, though,
> still has all of its log files, all 19,000 of them, and this never seems to
> change, only grow as new offsets come in.
>
> Restarting that broker doesn’t seem to help.
>
>
> We’ve checked the broker settings on everything as well.
>
> log.cleaner.enable = true
> log.cleanup.policy = delete
> cleanup.policy = compact
>
>
> Has anyone encountered this issue before?
>
> Thank you all for the help!
>
> Lawrence Weikum
>
>


Log Compaction not Deleting on Kafka 0.9.0.1

2016-06-22 Thread Lawrence Weikum
We seem to be having a strange issue with a cluster of ours; specifically with 
the __consumer_offsets topic.

When we brought the cluster online, log compaction was turned off.  Realizing 
our mistake, we turned it on, but only after the topic had over 31,018,699,972 
offsets committed to it.  Log compaction seems to have worked and be working 
properly.  The logs are showing that every partition has been compacted, and 
may pieces have been marked for deletion.

The problem is that not all partitions are having their older logs deleted.  
Some partitions will grow to having 19 log files, but a few seconds later will 
have only 13.  One partition in particular, though, still has all of its log 
files, all 19,000 of them, and this never seems to change, only grow as new 
offsets come in.

Restarting that broker doesn’t seem to help.


We’ve checked the broker settings on everything as well.

log.cleaner.enable = true
log.cleanup.policy = delete
cleanup.policy = compact


Has anyone encountered this issue before?

Thank you all for the help!

Lawrence Weikum



Re: Regarding Kafka Log compaction Features

2016-05-06 Thread Spico Florin
hi!
 please have a look at this article.  it help me touse the log compaction
feature mechanism

i hope thtat it helps.
regards,
florin

http://www.shayne.me/blog/2015/2015-06-25-everything-about-kafka-part-2/
On Thursday, May 5, 2016, Behera, Himansu (Contractor) <
himansu_beh...@cable.comcast.com> wrote:

> Hi Team,
>
>
>
> I am working on implementing the kafka log compaction feature in my
> project.
>
>
>
> Please find the server. Properties. I have made all the config changes
> needed/suggested in the kafka log compaction  forum.But was not to able to
> resolve  the issue.
>
>
>
> My use as follows:
>
>
>
> Step 1:.We send a keyed message(String,string) from one of the producer
>  to the topic.
>
> Step 2:Then we send around 10 million  keyed messages(with unique key) to
> the above said topic.
>
> Step 3:Then  we try to send update to  the key in step 1 with some other
> value other than  in step 1 after 1800 secs
>
>
>
> Expected Result:  The key should be updated with  the recent value.
>
> Actual Result: The updated key contains the old value.
>
>
>
>
>
> Appreciate if someone can help me in implementing the log compaction
>  features POC.
>
>
>
> Please find the server. properties attached for  your reference.
>
>
>
> Regards,
>
> Himansu
>
>
>


Regarding Kafka Log compaction Features

2016-05-05 Thread Behera, Himansu (Contractor)
Hi Team,

I am working on implementing the kafka log compaction feature in my project.

Please find the server. Properties. I have made all the config changes  
needed/suggested in the kafka log compaction  forum.But was not to able to 
resolve  the issue.

My use as follows:

Step 1:.We send a keyed message(String,string) from one of the producer  to the 
topic.
Step 2:Then we send around 10 million  keyed messages(with unique key) to the 
above said topic.
Step 3:Then  we try to send update to  the key in step 1 with some other value 
other than  in step 1 after 1800 secs

Expected Result:  The key should be updated with  the recent value.
Actual Result: The updated key contains the old value.


Appreciate if someone can help me in implementing the log compaction  features 
POC.

Please find the server. properties attached for  your reference.

Regards,
Himansu



Re: How to work around log compaction error (0.8.2.2)

2016-04-28 Thread Rakesh Vidyadharan
Hi Manikumar,

No, we are not using compression on our topics.  I will try out Todd Palino’s 
suggestion regarding the offsets topic.

Thanks
Rakesh




On 27/04/2016 23:34, "Manikumar Reddy"  wrote:

>Hi,
>
> Are you enabling log compaction on a topic with compressed messages?
> If yes, then that might be the reason for the exception.  0.8.2.2 Log
>Compaction does
> not support compressed  messages. This got fixed in 0.9.0.0 (KAFKA-1641,
>KAFKA-1374)
>
>Check below mail thread for some corrective actions
>http://grokbase.com/t/kafka/users/159jbe18en/log-cleaner-thread-stops
>
>
>On Thu, Apr 28, 2016 at 1:44 AM, Rakesh Vidyadharan <
>rvidyadha...@gracenote.com> wrote:
>
>> Hello,
>>
>> We enabled log compaction on a few topics, as we want to preserve
>> permanently the latest versions of messages published to specific topics.
>> After enabling compaction, the log cleaner thread dies with the same error
>> for the topics we tried it on.  It looks like kafka has starting offset
>> that does not exist in the topic (at least that is how I am reading the
>> error).  Any ideas on how we can work around this error?
>>
>> Thanks
>> Rakesh
>>
>> [2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting
>> (kafka.log.LogCleaner)
>> [2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log
>> metamorphosis.lineup-0. (kafka.log.LogCleaner)
>> [2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for
>> metamorphosis.lineup-0... (kafka.log.LogCleaner)
>> [2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log
>> metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466).
>> (kafka.log.LogCleaner)
>> [2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due
>> to  (kafka.log.LogCleaner)
>> java.lang.IllegalArgumentException: requirement failed: Last clean offset
>> is 1553258 but segment base offset is 2125968 for log
>> metamorphosis.lineup-0.
>> at scala.Predef$.require(Predef.scala:233)
>> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
>> at kafka.log.Cleaner.clean(LogCleaner.scala:307)
>> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
>> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>> [2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped
>> (kafka.log.LogCleaner)
>>
>>


Re: How to work around log compaction error (0.8.2.2)

2016-04-27 Thread Manikumar Reddy
Hi,

 Are you enabling log compaction on a topic with compressed messages?
 If yes, then that might be the reason for the exception.  0.8.2.2 Log
Compaction does
 not support compressed  messages. This got fixed in 0.9.0.0 (KAFKA-1641,
KAFKA-1374)

Check below mail thread for some corrective actions
http://grokbase.com/t/kafka/users/159jbe18en/log-cleaner-thread-stops


On Thu, Apr 28, 2016 at 1:44 AM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Hello,
>
> We enabled log compaction on a few topics, as we want to preserve
> permanently the latest versions of messages published to specific topics.
> After enabling compaction, the log cleaner thread dies with the same error
> for the topics we tried it on.  It looks like kafka has starting offset
> that does not exist in the topic (at least that is how I am reading the
> error).  Any ideas on how we can work around this error?
>
> Thanks
> Rakesh
>
> [2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log
> metamorphosis.lineup-0. (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for
> metamorphosis.lineup-0... (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log
> metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466).
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Last clean offset
> is 1553258 but segment base offset is 2125968 for log
> metamorphosis.lineup-0.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
> at kafka.log.Cleaner.clean(LogCleaner.scala:307)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
>
>


How to work around log compaction error (0.8.2.2)

2016-04-27 Thread Rakesh Vidyadharan
Hello,

We enabled log compaction on a few topics, as we want to preserve permanently 
the latest versions of messages published to specific topics.  After enabling 
compaction, the log cleaner thread dies with the same error for the topics we 
tried it on.  It looks like kafka has starting offset that does not exist in 
the topic (at least that is how I am reading the error).  Any ideas on how we 
can work around this error?

Thanks
Rakesh

[2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting  
(kafka.log.LogCleaner)
[2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log 
metamorphosis.lineup-0. (kafka.log.LogCleaner)
[2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for 
metamorphosis.lineup-0... (kafka.log.LogCleaner)
[2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log 
metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466). 
(kafka.log.LogCleaner)
[2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
1553258 but segment base offset is 2125968 for log metamorphosis.lineup-0.
at scala.Predef$.require(Predef.scala:233)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
at kafka.log.Cleaner.clean(LogCleaner.scala:307)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)



Re: Metrics for Log Compaction

2016-04-15 Thread Manikumar Reddy
Hi,

log compaction related JMX metric object names are given below.

kafka.log:type=LogCleaner,name=cleaner-recopy-percent
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
kafka.log:type=LogCleaner,name=max-clean-time-secs
kafka.log:type=LogCleanerManager,name=max-dirty-percent


After every compaction cycle, we also print some useful statistics to
 logs/log-cleaner.log file.


On Wed, Apr 13, 2016 at 7:16 PM, Kasim Doctor  wrote:

> Hi everyone,
>
> We are starting to use log compaction for one of our topics and I was
> wondering if there are any specific metrics exposed to monitor how often
> compaction took place and/or how many records (with some metadata related
> to partition) were deleted ?
>
> I looked at JMX metrics that Kafka exposes but could not find specific
> metrics related to compaction.
>
> Any insight or help would be appreciated.
>
> Thanks,
> Kasim
>
>
>


Re: Metrics for Log Compaction

2016-04-15 Thread Manikumar Reddy
Hi,


kafka.log:type=LogCleaner,name=cleaner-recopy-percent
kafka.log:type=LogCleanerManager,name=max-dirty-percent
kafka.log:type=LogCleaner,name=max-clean-time-secs



After every compaction cycle, we also print some useful statistics to
 logs/log-cleaner.log file.


On Wed, Apr 13, 2016 at 7:16 PM, Kasim Doctor  wrote:

> Hi everyone,
>
> We are starting to use log compaction for one of our topics and I was
> wondering if there are any specific metrics exposed to monitor how often
> compaction took place and/or how many records (with some metadata related
> to partition) were deleted ?
>
> I looked at JMX metrics that Kafka exposes but could not find specific
> metrics related to compaction.
>
> Any insight or help would be appreciated.
>
> Thanks,
> Kasim
>
>
>


Metrics for Log Compaction

2016-04-13 Thread Kasim Doctor
Hi everyone,

We are starting to use log compaction for one of our topics and I was wondering 
if there are any specific metrics exposed to monitor how often compaction took 
place and/or how many records (with some metadata related to partition) were 
deleted ?

I looked at JMX metrics that Kafka exposes but could not find specific metrics 
related to compaction.

Any insight or help would be appreciated.

Thanks,
Kasim




Re: Log Compaction v. Retention

2015-11-13 Thread Mayuresh Gharat
I think you can mark a tombstone (null value) for the keys explicitly, that
will eventually delete those messages from the log.

Thanks,

Mayuresh

On Wed, Nov 11, 2015 at 6:18 AM, Morgan Kenyon  wrote:

> I came across an interesting question on StackOverflow dealing with the
> difference between compaction and retention. To sum up the question and my
> incorrect answer, I was unaware that retention and compaction were mutually
> exclusive (based on grokbase link below)
>
> Is it true that when setting a log to be compacted there is no way to
> delete old messages if they're not deleted through compaction? I imagine a
> use case where the user has a limited hard drive capacity and would like to
> compact a log up till a certain size, then delete old messages. While using
> compaction are unique kryed messages guaranteed to remain in log
> indefinitely? Or is there any other way to delete them?
>
>
> http://stackoverflow.com/questions/33632362/how-clean-old-segments-from-compacted-log-in-kafka-0-8-2/
> http://grokbase.com/t/kafka/users/14bv6gaz0t/kafka-0-8-2-log-cleaner
>
> --
>
> *Morgan Kenyon*
> Software Engineer
> Lymba Corporation
> Phone: 972-680-0800
> Email: mor...@lymba.com
>
> [image: Logo]
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: log compaction scaling with ~100m messages

2015-10-08 Thread vipul jhawar
Just want to chime on this question as this does seem a good option to
avoid some memory hungry K,V store in case we are ok with some async
processing. There are cases where you want a combination of some near
realtime and some offline processing of the same index and as kafka topic
is much efficient in terms of memory and you can scoop out the messages
much faster on basis of the parallelism v/s introducing another component
in your store just to iterate over the keys and produce results.

Considering Joel's feedback should we really avoid this option at all or
should we shard it lot and then be ok to store approx. 500 million or a
billion messages in the topic ?

Thanks

On Thu, Oct 8, 2015 at 12:46 PM, Jan Filipiak 
wrote:

> Hi,
>
> just want to pick this up again. You can always use more partitions to
> reduce the number of keys handled by a single broker and parallelize the
> compaction. So with sufficient number of machines and the ability to
> partition I don’t see you running into problems.
>
> Jan
>
>
> On 07.10.2015 05:34, Feroze Daud wrote:
>
>> hi!
>> We have a use case where we want to store ~100m keys in kafka. Is there
>> any problem with this approach?
>> I have heard from some people using kafka, that kafka has a problem when
>> doing log compaction with those many number of keys.
>> Another topic might have around 10 different K/V pairs for each key in
>> the primary topic. The primary topic's keyspace is approx of 100m keys. We
>> would like to store this in kafka because we are doing a lot of stream
>> processing on these messages, and want to avoid writing another process to
>> recompute data from snapshots.
>> So, in summary:
>> primary topic: ~100m keyssecondary topic: ~1B keys
>> Is it feasible to use log compaction at such a scale of data?
>> Thanks
>> feroze.
>>
>
>


Re: log compaction scaling with ~100m messages

2015-10-08 Thread Jan Filipiak

Hi,

just want to pick this up again. You can always use more partitions to 
reduce the number of keys handled by a single broker and parallelize the 
compaction. So with sufficient number of machines and the ability to 
partition I don’t see you running into problems.


Jan

On 07.10.2015 05:34, Feroze Daud wrote:

hi!
We have a use case where we want to store ~100m keys in kafka. Is there any 
problem with this approach?
I have heard from some people using kafka, that kafka has a problem when doing 
log compaction with those many number of keys.
Another topic might have around 10 different K/V pairs for each key in the 
primary topic. The primary topic's keyspace is approx of 100m keys. We would 
like to store this in kafka because we are doing a lot of stream processing on 
these messages, and want to avoid writing another process to recompute data 
from snapshots.
So, in summary:
primary topic: ~100m keyssecondary topic: ~1B keys
Is it feasible to use log compaction at such a scale of data?
Thanks
feroze.




Re: log compaction scaling with ~100m messages

2015-10-08 Thread Feroze Daud
Thank you for your response!
Our use case is more similar to a traditional k/v store. We are doing a new 
process that is going to spit huge amounts of data. We are using kafka as a 
broker so that downstream clients can all consume from the kafka topic. What we 
would like to avoid, is writing two systems - a stream processing system that 
runs on kafka and another one that runs on snapshots. SO for eg, when we ship, 
we will have a process running on kafka doing stream processing. If we change 
any business logic in our process we would like to reset our queue level back 
to zero and reprocess the whole queue.
but if I understand you, it seems that givben our key space, we cant do that?
Our update rate is as follows:
we expect ~150m unique K,V pairs to be created in the initial ship. After that, 
we expect about 3 updates to each key per year. Updates for all keys will not 
happen at the same time. so, what do you think? Do you still advise that using 
the topic as a K/V store with log compatction ( not time base retention ) will 
work?
If not, is there any other processing paradigm we can look into where we can 
use the same code for stream processing as well as reprocessing entire dataset? 


 On Wednesday, October 7, 2015 11:16 AM, Joel Koshy  
wrote:
   

 Using log compaction is well-suited for applications that use Kafka directly 
and need to persist some state associated with its processing. So something 
like offset management for consumers is a good fit. Another good use-case is 
for storing schemas associated with your Kafka topics. These are both very 
specific to maintaining metadata around your stream processing. Although it can 
be used for more general K-V storage it is not always a good fit. This is 
especially true if your key-space is bound to grow significantly over time or 
has an high update rate. The other aspect is the need to do some sort of 
caching of your key-value pairs (since otherwise lookups would require scanning 
the log). So for application-level general K-V storage, you could certainly use 
Kafka as a persistence mechanism for recording recent updates (with traditional 
time-based retention), but you would probably want a more suitable K-V store 
separate from Kafka. I'm not sure this (i.e., traditional db storage) is your 
use case since you mention "a lot of stream processing on these messages" - so 
it sounds more like repetitive processing over the entire key space. For that 
it may be more reasonable. The alternative is to use snapshots and read more 
recent updates from the updates stream in Kafka. Samza folks may want to weigh 
in here as well.
That said, to answer your question: sure it is feasible to use log compaction 
with 1B keys, especially if you have enough brokers, partitions, and log 
cleaner threads but I'm not sure it is the best approach to take. We did hit 
various issues (bugs/feature gaps) with log compaction while using it for 
consumer offset management: e.g., support for compressed messages, various 
other bugs, but most of these have been resolved.
Hope that helps,

Joel

On Tue, Oct 6, 2015 at 8:34 PM, Feroze Daud  wrote:
> hi!
> We have a use case where we want to store ~100m keys in kafka. Is there any 
> problem with this approach?
> I have heard from some people using kafka, that kafka has a problem when 
> doing log compaction with those many number of keys.
> Another topic might have around 10 different K/V pairs for each key in the 
> primary topic. The primary topic's keyspace is approx of 100m keys. We would 
> like to store this in kafka because we are doing a lot of stream processing 
> on these messages, and want to avoid writing another process to recompute 
> data from snapshots.
> So, in summary:
> primary topic: ~100m keyssecondary topic: ~1B keys
> Is it feasible to use log compaction at such a scale of data?
> Thanks
> feroze.



  

Re: log compaction scaling with ~100m messages

2015-10-07 Thread Joel Koshy
Using log compaction is well-suited for applications that use Kafka
directly and need to persist some state associated with its processing. So
something like offset management for consumers
<http://www.slideshare.net/jjkoshy/offset-management-in-kafka> is a good
fit. Another good use-case is for storing schemas
<https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java>
associated with your Kafka topics. These are both very specific to
maintaining metadata around your stream processing. Although it can be used
for more general K-V storage it is not *always* a good fit. This is
especially true if your key-space is bound to grow significantly over time
or has an high update rate. The other aspect is the need to do some sort of
caching of your key-value pairs (since otherwise lookups would require
scanning the log). So for application-level general K-V storage, you could
certainly use Kafka as a persistence mechanism for recording recent updates
(with traditional time-based retention), but you would probably want a more
suitable K-V store separate from Kafka. I'm not sure this (i.e.,
traditional db storage) is your use case since you mention "a lot of stream
processing on these messages" - so it sounds more like repetitive
processing over the entire key space. For that it may be more reasonable.
The alternative is to use snapshots and read more recent updates from the
updates stream in Kafka. Samza folks may want to weigh in here as well.

That said, to answer your question: sure it is feasible to use log
compaction with 1B keys, especially if you have enough brokers, partitions,
and log cleaner threads but I'm not sure it is the best approach to take.
We did hit various issues (bugs/feature gaps) with log compaction while
using it for consumer offset management: e.g., support for compressed
messages, various other bugs, but most of these have been resolved.

Hope that helps,

Joel

On Tue, Oct 6, 2015 at 8:34 PM, Feroze Daud 
wrote:
> hi!
> We have a use case where we want to store ~100m keys in kafka. Is there
any problem with this approach?
> I have heard from some people using kafka, that kafka has a problem when
doing log compaction with those many number of keys.
> Another topic might have around 10 different K/V pairs for each key in
the primary topic. The primary topic's keyspace is approx of 100m keys. We
would like to store this in kafka because we are doing a lot of stream
processing on these messages, and want to avoid writing another process to
recompute data from snapshots.
> So, in summary:
> primary topic: ~100m keyssecondary topic: ~1B keys
> Is it feasible to use log compaction at such a scale of data?
> Thanks
> feroze.


log compaction scaling with ~100m messages

2015-10-06 Thread Feroze Daud
hi!
We have a use case where we want to store ~100m keys in kafka. Is there any 
problem with this approach?
I have heard from some people using kafka, that kafka has a problem when doing 
log compaction with those many number of keys.
Another topic might have around 10 different K/V pairs for each key in the 
primary topic. The primary topic's keyspace is approx of 100m keys. We would 
like to store this in kafka because we are doing a lot of stream processing on 
these messages, and want to avoid writing another process to recompute data 
from snapshots.
So, in summary:
primary topic: ~100m keyssecondary topic: ~1B keys
Is it feasible to use log compaction at such a scale of data?
Thanks
feroze.

Re: configuring log compaction

2015-08-09 Thread David Judd
Ok. I think that actually works fine for my use case. Thanks!

On Sun, Aug 9, 2015 at 4:26 PM, Jiangjie Qin 
wrote:

> Actually Kafka only support two mutually exclusive log cleanup policy: 1)
> delete logs after retention period passed. 2) compact the log to only keep
> the last updated value of a key.
>
> log.retention.hours is only used by (1). For log compaction, currently it
> is not compacting the logs by its age, but by the dirty ratio in bytes
> (uncompacted log size / total log size). The config is
> log.cleaner.min.cleanable.ratio.
>
> So it might be a little bit hard to enforce the policy of "compact log
> after 7 days".
>
> Jiangjie (Becket) Qin
>
> ​
>


Re: configuring log compaction

2015-08-09 Thread Jiangjie Qin
Actually Kafka only support two mutually exclusive log cleanup policy: 1)
delete logs after retention period passed. 2) compact the log to only keep
the last updated value of a key.

log.retention.hours is only used by (1). For log compaction, currently it
is not compacting the logs by its age, but by the dirty ratio in bytes
(uncompacted log size / total log size). The config is
log.cleaner.min.cleanable.ratio.

So it might be a little bit hard to enforce the policy of "compact log
after 7 days".

Jiangjie (Becket) Qin

​


configuring log compaction

2015-08-09 Thread David Judd
Hi,

I have a beginner's question. I'd like to confirm that I'm interpreting the
docs correctly.

My goal is a setup where, by default, logs are compacted after 7 days and
the compacted records are retained indefinitely. Does the following
combination of settings achieve this?

log.retention.hours=168
log.cleaner.enable=true
log.cleanup.policy=compact

An alternative interpretation of the docs, which I can't quite rule out
from my reading so far, would be that this causes logs to be compacted when
the cleaner does a periodic run and then the compacted logs are deleted
when the retention period expires (after 7 days). I think that's not right,
though either way, it would be useful if more of the high-level
documentation articles included concrete configuration setting examples.

Thanks for your help,
David


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Ah misread that sorry!

On 17.06.2015 14:26, Shayne S wrote:

Right, you can see I've got segment.ms set.  The trick is that they don't
actually roll over until something new arrives. If your topic is idle (not
receiving messages), it won't ever roll over to a new segment, and thus the
last segment will never be compacted.

Thanks!
Shayne

On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak 
wrote:


Hi,

you might want to have a look here:
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the
time/size when segments are rolled.

Best
Jan


On 16.06.2015 14:05, Shayne S wrote:


Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In
other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne






Re: Log compaction not working as expected

2015-06-17 Thread Shayne S
Right, you can see I've got segment.ms set.  The trick is that they don't
actually roll over until something new arrives. If your topic is idle (not
receiving messages), it won't ever roll over to a new segment, and thus the
last segment will never be compacted.

Thanks!
Shayne

On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak 
wrote:

> Hi,
>
> you might want to have a look here:
> http://kafka.apache.org/documentation.html#topic-config
> _segment.ms_ and _segment.bytes _ should allow you to control the
> time/size when segments are rolled.
>
> Best
> Jan
>
>
> On 16.06.2015 14:05, Shayne S wrote:
>
>> Some further information, and is this a bug?  I'm using 0.8.2.1.
>>
>> Log compaction will only occur on the non active segments.  Intentional or
>> not, it seems that the last segment is always the active segment.  In
>> other
>> words, an expired segment will not be cleaned until a new segment has been
>> created.
>>
>> As a result, a log won't be compacted until new data comes in (per
>> partition). Does this mean I need to send the equivalent of a pig (
>> https://en.wikipedia.org/wiki/Pigging) through each partition in order to
>> force compaction?  Or can I force the cleaning somehow?
>>
>> Here are the steps to recreate:
>>
>> 1. Create a new topic with a 5 minute segment.ms:
>>
>> kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
>> --replication-factor 1 --partitions 1 --config cleanup.policy=compact
>> --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
>>
>> 2. Repeatedly add messages with identical keys (3x):
>>
>> echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
>> localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
>> key.separator=, --new-producer
>>
>> 3. Wait 5+ minutes and confirm no log compaction.
>> 4. Once satisfied, send a new message:
>>
>> echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
>> localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
>> key.separator=, --new-producer
>>
>> 5. Log compaction will occur quickly soon after.
>>
>> Is my use case of infrequent logs not supported? Is this intentional
>> behavior? It's unnecessarily challenging to target each partition with a
>> dummy message to trigger compaction.
>>
>> Also, I believe there is another issue with logs originally configured
>> without a segment timeout that lead to my original issue.  I still cannot
>> get those logs to compact.
>>
>> Thanks!
>> Shayne
>>
>>
>


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Hi,

you might want to have a look here: 
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the 
time/size when segments are rolled.


Best
Jan

On 16.06.2015 14:05, Shayne S wrote:

Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne





Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Ok..I got your point. Currently we check the log segment constraints
(segment.bytes, segment.ms)
only before appending new messages. So we will not create a new log segment
until new data comes.

In your case, your approach(sending periodic dummy/ping message) should be
fine.



On Tue, Jun 16, 2015 at 7:19 PM, Shayne S  wrote:

> Thank you for the response!
>
> Unfortunately, those improvements would not help.  It is the lack of
> activity resulting in a new segment that prevents compaction.
>
> I was confused by what qualifies as the active segment. The active segment
> is the last segment as opposed to the segment that would be written to if
> something were received right now.
>
> On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy 
> wrote:
>
> > Hi,
> >
> >   Your observation is correct.  we never compact the active segment.
> >   Some improvements are proposed here,
> >   https://issues.apache.org/jira/browse/KAFKA-1981
> >
> >
> > Manikumar
> >
> > On Tue, Jun 16, 2015 at 5:35 PM, Shayne S  wrote:
> >
> > > Some further information, and is this a bug?  I'm using 0.8.2.1.
> > >
> > > Log compaction will only occur on the non active segments.  Intentional
> > or
> > > not, it seems that the last segment is always the active segment.  In
> > other
> > > words, an expired segment will not be cleaned until a new segment has
> > been
> > > created.
> > >
> > > As a result, a log won't be compacted until new data comes in (per
> > > partition). Does this mean I need to send the equivalent of a pig (
> > > https://en.wikipedia.org/wiki/Pigging) through each partition in order
> > to
> > > force compaction?  Or can I force the cleaning somehow?
> > >
> > > Here are the steps to recreate:
> > >
> > > 1. Create a new topic with a 5 minute segment.ms:
> > >
> > > kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
> > > --replication-factor 1 --partitions 1 --config cleanup.policy=compact
> > > --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
> > >
> > > 2. Repeatedly add messages with identical keys (3x):
> > >
> > > echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> > > localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> > > key.separator=, --new-producer
> > >
> > > 3. Wait 5+ minutes and confirm no log compaction.
> > > 4. Once satisfied, send a new message:
> > >
> > > echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> > > localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> > > key.separator=, --new-producer
> > >
> > > 5. Log compaction will occur quickly soon after.
> > >
> > > Is my use case of infrequent logs not supported? Is this intentional
> > > behavior? It's unnecessarily challenging to target each partition with
> a
> > > dummy message to trigger compaction.
> > >
> > > Also, I believe there is another issue with logs originally configured
> > > without a segment timeout that lead to my original issue.  I still
> cannot
> > > get those logs to compact.
> > >
> > > Thanks!
> > > Shayne
> > >
> >
>


Re: Log compaction not working as expected

2015-06-16 Thread Shayne S
Thank you for the response!

Unfortunately, those improvements would not help.  It is the lack of
activity resulting in a new segment that prevents compaction.

I was confused by what qualifies as the active segment. The active segment
is the last segment as opposed to the segment that would be written to if
something were received right now.

On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy 
wrote:

> Hi,
>
>   Your observation is correct.  we never compact the active segment.
>   Some improvements are proposed here,
>   https://issues.apache.org/jira/browse/KAFKA-1981
>
>
> Manikumar
>
> On Tue, Jun 16, 2015 at 5:35 PM, Shayne S  wrote:
>
> > Some further information, and is this a bug?  I'm using 0.8.2.1.
> >
> > Log compaction will only occur on the non active segments.  Intentional
> or
> > not, it seems that the last segment is always the active segment.  In
> other
> > words, an expired segment will not be cleaned until a new segment has
> been
> > created.
> >
> > As a result, a log won't be compacted until new data comes in (per
> > partition). Does this mean I need to send the equivalent of a pig (
> > https://en.wikipedia.org/wiki/Pigging) through each partition in order
> to
> > force compaction?  Or can I force the cleaning somehow?
> >
> > Here are the steps to recreate:
> >
> > 1. Create a new topic with a 5 minute segment.ms:
> >
> > kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
> > --replication-factor 1 --partitions 1 --config cleanup.policy=compact
> > --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
> >
> > 2. Repeatedly add messages with identical keys (3x):
> >
> > echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> > localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> > key.separator=, --new-producer
> >
> > 3. Wait 5+ minutes and confirm no log compaction.
> > 4. Once satisfied, send a new message:
> >
> > echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> > localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> > key.separator=, --new-producer
> >
> > 5. Log compaction will occur quickly soon after.
> >
> > Is my use case of infrequent logs not supported? Is this intentional
> > behavior? It's unnecessarily challenging to target each partition with a
> > dummy message to trigger compaction.
> >
> > Also, I believe there is another issue with logs originally configured
> > without a segment timeout that lead to my original issue.  I still cannot
> > get those logs to compact.
> >
> > Thanks!
> > Shayne
> >
>


Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Hi,

  Your observation is correct.  we never compact the active segment.
  Some improvements are proposed here,
  https://issues.apache.org/jira/browse/KAFKA-1981


Manikumar

On Tue, Jun 16, 2015 at 5:35 PM, Shayne S  wrote:

> Some further information, and is this a bug?  I'm using 0.8.2.1.
>
> Log compaction will only occur on the non active segments.  Intentional or
> not, it seems that the last segment is always the active segment.  In other
> words, an expired segment will not be cleaned until a new segment has been
> created.
>
> As a result, a log won't be compacted until new data comes in (per
> partition). Does this mean I need to send the equivalent of a pig (
> https://en.wikipedia.org/wiki/Pigging) through each partition in order to
> force compaction?  Or can I force the cleaning somehow?
>
> Here are the steps to recreate:
>
> 1. Create a new topic with a 5 minute segment.ms:
>
> kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
> --replication-factor 1 --partitions 1 --config cleanup.policy=compact
> --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
>
> 2. Repeatedly add messages with identical keys (3x):
>
> echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> key.separator=, --new-producer
>
> 3. Wait 5+ minutes and confirm no log compaction.
> 4. Once satisfied, send a new message:
>
> echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
> localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
> key.separator=, --new-producer
>
> 5. Log compaction will occur quickly soon after.
>
> Is my use case of infrequent logs not supported? Is this intentional
> behavior? It's unnecessarily challenging to target each partition with a
> dummy message to trigger compaction.
>
> Also, I believe there is another issue with logs originally configured
> without a segment timeout that lead to my original issue.  I still cannot
> get those logs to compact.
>
> Thanks!
> Shayne
>


Re: Log compaction not working as expected

2015-06-16 Thread Shayne S
Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne


Log compaction not working as expected

2015-06-12 Thread Shayne S
Hi, I'm new to Kafka and having trouble with log compaction.

I'm attempting to set up topics that will aggressively compact, but so far
I'm having trouble getting complete compaction at all.  The topic is
configured like so:

Topic:beer_archive PartitionCount:20 ReplicationFactor:1
Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=6,segment.ms
=180,cleanup.policy=compact

The dirty ratio and segment.ms have been changed after duplicated records
have shown up in an attempt to get compaction to work. My test for success
is a dump of keys, comparing the total count to the unique count. This list
is produced like so:

kafka-console-consumer.sh . --from-beginning --property print.key=true
| cut -f1 > id_file

This gives me 535,480 unique keys, and a total of 2,230,784 entries. After
making tweaks to the segment.ms to make the last segment eligible for
compaction, SOME compaction did occur a couple times.  A sample compaction
from the log:

[2015-06-12 15:51:31,440] INFO Cleaner 0: Beginning cleaning of log
beer_archive-5. (kafka.log.LogCleaner)
[2015-06-12 15:51:31,441] INFO Cleaner 0: Building offset map for
beer_archive-5... (kafka.log.LogCleaner)
[2015-06-12 15:51:31,580] INFO Cleaner 0: Building offset map for log
beer_archive-5 for 1 segments in offset range [123847, 126857).
(kafka.log.LogCleaner)
[2015-06-12 15:51:31,583] INFO Cleaner 0: Offset map for log beer_archive-5
complete. (kafka.log.LogCleaner)
[2015-06-12 15:51:31,583] INFO Cleaner 0: Cleaning log beer_archive-5
(discarding tombstones prior to Fri Jun 12 14:41:42 UTC 2015)...
(kafka.log.LogCleaner)
[2015-06-12 15:51:31,583] INFO Cleaner 0: Cleaning segment 0 in log
beer_archive-5 (last modified Fri Jun 12 14:42:42 UTC 2015) into 0,
retaining deletes. (kafka.log.LogCleaner)
[2015-06-12 15:51:32,319] INFO Cleaner 0: Cleaning segment 123847 in log
beer_archive-5 (last modified Fri Jun 12 15:26:00 UTC 2015) into 0,
retaining deletes. (kafka.log.LogCleaner)
[2015-06-12 15:51:35,094] INFO Cleaner 0: Swapping in cleaned segment 0 for
segment(s) 0,123847 in log beer_archive-5. (kafka.log.LogCleaner)
[2015-06-12 15:51:35,095] INFO [kafka-log-cleaner-thread-0],
Log cleaner thread 0 cleaned log beer_archive-5 (dirty section =
[123847, 126857])
116.5 MB of log processed in 3.7 seconds (31.9 MB/sec).
Indexed 2.5 MB in 0.1 seconds (17.2 Mb/sec, 3.9% of total time)
Buffer utilization: 0.0%
Cleaned 116.5 MB in 3.5 seconds (33.2 Mb/sec, 96.1% of total time)
Start size: 116.5 MB (111,662 messages)
End size: 115.0 MB (109,893 messages)
1.2% size reduction (1.6% fewer messages)
 (kafka.log.LogCleaner)

Any ideas where I'm going wrong?

Thanks!
Shayne


Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-05-13 Thread Jonathan Hodges
Very good points, Gwen.  I hadn't thought of Oracle Streams case of
dependencies.  I wonder if GoldenGate handles this better?

The tradeoff of these approaches is that each RDBMS will be proprietary on
how to get this CDC information.  I guess GoldenGate can be a standard
interface on RDBMs, but there really isn't anything covering NoSQL stores
like HBase, Cassandra, Mongo.

I wish us poor analytics guys had more say on what OLTP stores the
application teams use :)  LinkedIn solved this pretty well with having
their teams use Expresso which has a nice CDC pattern with MySQL engine
under the covers.


On Sun, May 10, 2015 at 12:48 AM, Gwen Shapira 
wrote:

> Hi Jonathan,
>
> I agree we can have topic-per-table, but some transactions may span
> multiple tables and therefore will get applied partially out-of-order. I
> suspect this can be a consistency issue and create a state that is
> different than the state in the original database, but I don't have good
> proof of it.
>
> I know that Oracle Streams has "Parallel Apply" feature where they figure
> out whether transactions have dependencies and apply in parallel only if
> they don't. So it sounds like dependencies may be an issue.
>
> Planning to give this more thought :)
>
> Gwen
>
> On Fri, May 1, 2015 at 7:56 PM, Jonathan Hodges  wrote:
>
> > Hi Gwen,
> >
> > As you said I see Bottled Water and Sqoop managing slightly different use
> > cases so I don't see this feature as a Sqoop killer.  However I did have
> a
> > question on your comment that the transaction log or CDC approach will
> have
> > problems with very large, very active databases.
> >
> > I get that you need to have a single producer that transmits the
> > transaction log changes to Kafka in order.  However on the consumer side
> > you can have a topic per table and then partition these topics by primary
> > key to achieve nice parallelism.  So it seems the producer is the
> potential
> > bottleneck, but I imagine you can scale that appropriately vertically and
> > put the proper HA.
> >
> > Would love to hear your thoughts on this.
> >
> > Jonathan
> >
> >
> >
> > On Thu, Apr 30, 2015 at 5:09 PM, Gwen Shapira 
> > wrote:
> >
> > > I feel a need to respond to the Sqoop-killer comment :)
> > >
> > > 1) Note that most databases have a single transaction log per db and in
> > > order to get the correct view of the DB, you need to read it in order
> > > (otherwise transactions will get messed up). This means you are limited
> > to
> > > a single producer reading data from the log, writing it to a single
> > > partition and getting it read from a single consumer. If the database
> is
> > > very large and very active, you may run into some issues there...
> > >
> > > Because Sqoop doesn't try to catch up with all the changes, but takes a
> > > snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
> > > 10TB databases.
> > >
> > > 2) If HDFS is the target of getting data from Postgres, then postgresql
> > ->
> > > kafka -> HDFS seems less optimal than postgresql -> HDFS directly (in
> > > parallel). There are good reasons to get Postgres data to Kafka, but if
> > the
> > > eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.
> > >
> > > 3) Due to its parallelism and general purpose JDBC connector, I suspect
> > > that Sqoop is even a very viable way of getting data into Kafka.
> > >
> > > Gwen
> > >
> > >
> > > On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak <
> jan.filip...@trivago.com>
> > > wrote:
> > >
> > > > Hello Everyone,
> > > >
> > > > I am quite exited about the recent example of replicating PostgresSQL
> > > > Changes to Kafka. My view on the log compaction feature always had
> > been a
> > > > very sceptical one, but now with its great potential exposed to the
> > wide
> > > > public, I think its an awesome feature. Especially when pulling this
> > data
> > > > into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to
> thank
> > > > everyone who had the vision of building these kind of systems during
> a
> > > time
> > > > I could not imagine those.
> > > >
> > > > There is one open question that I would like people to help me with.
> > When
> > > > pulling a snapshot of a partition into HDFS using a camus-like
> > > application
> > &

Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-05-10 Thread Hisham Mardam-Bey
With mypipe (MySQL -> Kafka) we've had a similar discussion re: topic names
and preserving transactions.

At this point:

- Kafka topic names are configurable allowing for per db or per table topics
- transactions maintain a transaction ID for each event when published into
Kafka

https://github.com/mardambey/mypipe/issues/7

Cheers (=

Hisham.


On Sun, May 10, 2015 at 2:48 AM, Gwen Shapira  wrote:

> Hi Jonathan,
>
> I agree we can have topic-per-table, but some transactions may span
> multiple tables and therefore will get applied partially out-of-order. I
> suspect this can be a consistency issue and create a state that is
> different than the state in the original database, but I don't have good
> proof of it.
>
> I know that Oracle Streams has "Parallel Apply" feature where they figure
> out whether transactions have dependencies and apply in parallel only if
> they don't. So it sounds like dependencies may be an issue.
>
> Planning to give this more thought :)
>
> Gwen
>
> On Fri, May 1, 2015 at 7:56 PM, Jonathan Hodges  wrote:
>
> > Hi Gwen,
> >
> > As you said I see Bottled Water and Sqoop managing slightly different use
> > cases so I don't see this feature as a Sqoop killer.  However I did have
> a
> > question on your comment that the transaction log or CDC approach will
> have
> > problems with very large, very active databases.
> >
> > I get that you need to have a single producer that transmits the
> > transaction log changes to Kafka in order.  However on the consumer side
> > you can have a topic per table and then partition these topics by primary
> > key to achieve nice parallelism.  So it seems the producer is the
> potential
> > bottleneck, but I imagine you can scale that appropriately vertically and
> > put the proper HA.
> >
> > Would love to hear your thoughts on this.
> >
> > Jonathan
> >
> >
> >
> > On Thu, Apr 30, 2015 at 5:09 PM, Gwen Shapira 
> > wrote:
> >
> > > I feel a need to respond to the Sqoop-killer comment :)
> > >
> > > 1) Note that most databases have a single transaction log per db and in
> > > order to get the correct view of the DB, you need to read it in order
> > > (otherwise transactions will get messed up). This means you are limited
> > to
> > > a single producer reading data from the log, writing it to a single
> > > partition and getting it read from a single consumer. If the database
> is
> > > very large and very active, you may run into some issues there...
> > >
> > > Because Sqoop doesn't try to catch up with all the changes, but takes a
> > > snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
> > > 10TB databases.
> > >
> > > 2) If HDFS is the target of getting data from Postgres, then postgresql
> > ->
> > > kafka -> HDFS seems less optimal than postgresql -> HDFS directly (in
> > > parallel). There are good reasons to get Postgres data to Kafka, but if
> > the
> > > eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.
> > >
> > > 3) Due to its parallelism and general purpose JDBC connector, I suspect
> > > that Sqoop is even a very viable way of getting data into Kafka.
> > >
> > > Gwen
> > >
> > >
> > > On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak <
> jan.filip...@trivago.com>
> > > wrote:
> > >
> > > > Hello Everyone,
> > > >
> > > > I am quite exited about the recent example of replicating PostgresSQL
> > > > Changes to Kafka. My view on the log compaction feature always had
> > been a
> > > > very sceptical one, but now with its great potential exposed to the
> > wide
> > > > public, I think its an awesome feature. Especially when pulling this
> > data
> > > > into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to
> thank
> > > > everyone who had the vision of building these kind of systems during
> a
> > > time
> > > > I could not imagine those.
> > > >
> > > > There is one open question that I would like people to help me with.
> > When
> > > > pulling a snapshot of a partition into HDFS using a camus-like
> > > application
> > > > I feel the need of keeping a Set of all keys read so far and stop as
> > soon
> > > > as I find a key beeing already in my set. I use this as an indicator
> of
> > > how
> > > > far the log compaction has happened already and only pull up to this
> > > point.
> > > > This works quite well as I do not need to keep the messages but only
> > > their
> > > > keys in memory.
> > > >
> > > > The question I want to raise with the community is:
> > > >
> > > > How do you prevent pulling the same record twice (in different
> > versions)
> > > > and would it be beneficial if the "OffsetResponse" would also return
> > the
> > > > last offset that got compacted so far and the application would just
> > pull
> > > > up to this point?
> > > >
> > > > Looking forward for some recommendations and comments.
> > > >
> > > > Best
> > > > Jan
> > > >
> > > >
> > >
> >
>



-- 
Hisham Mardam-Bey
http://hisham.cc/


Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-05-09 Thread Gwen Shapira
Hi Jonathan,

I agree we can have topic-per-table, but some transactions may span
multiple tables and therefore will get applied partially out-of-order. I
suspect this can be a consistency issue and create a state that is
different than the state in the original database, but I don't have good
proof of it.

I know that Oracle Streams has "Parallel Apply" feature where they figure
out whether transactions have dependencies and apply in parallel only if
they don't. So it sounds like dependencies may be an issue.

Planning to give this more thought :)

Gwen

On Fri, May 1, 2015 at 7:56 PM, Jonathan Hodges  wrote:

> Hi Gwen,
>
> As you said I see Bottled Water and Sqoop managing slightly different use
> cases so I don't see this feature as a Sqoop killer.  However I did have a
> question on your comment that the transaction log or CDC approach will have
> problems with very large, very active databases.
>
> I get that you need to have a single producer that transmits the
> transaction log changes to Kafka in order.  However on the consumer side
> you can have a topic per table and then partition these topics by primary
> key to achieve nice parallelism.  So it seems the producer is the potential
> bottleneck, but I imagine you can scale that appropriately vertically and
> put the proper HA.
>
> Would love to hear your thoughts on this.
>
> Jonathan
>
>
>
> On Thu, Apr 30, 2015 at 5:09 PM, Gwen Shapira 
> wrote:
>
> > I feel a need to respond to the Sqoop-killer comment :)
> >
> > 1) Note that most databases have a single transaction log per db and in
> > order to get the correct view of the DB, you need to read it in order
> > (otherwise transactions will get messed up). This means you are limited
> to
> > a single producer reading data from the log, writing it to a single
> > partition and getting it read from a single consumer. If the database is
> > very large and very active, you may run into some issues there...
> >
> > Because Sqoop doesn't try to catch up with all the changes, but takes a
> > snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
> > 10TB databases.
> >
> > 2) If HDFS is the target of getting data from Postgres, then postgresql
> ->
> > kafka -> HDFS seems less optimal than postgresql -> HDFS directly (in
> > parallel). There are good reasons to get Postgres data to Kafka, but if
> the
> > eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.
> >
> > 3) Due to its parallelism and general purpose JDBC connector, I suspect
> > that Sqoop is even a very viable way of getting data into Kafka.
> >
> > Gwen
> >
> >
> > On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak 
> > wrote:
> >
> > > Hello Everyone,
> > >
> > > I am quite exited about the recent example of replicating PostgresSQL
> > > Changes to Kafka. My view on the log compaction feature always had
> been a
> > > very sceptical one, but now with its great potential exposed to the
> wide
> > > public, I think its an awesome feature. Especially when pulling this
> data
> > > into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank
> > > everyone who had the vision of building these kind of systems during a
> > time
> > > I could not imagine those.
> > >
> > > There is one open question that I would like people to help me with.
> When
> > > pulling a snapshot of a partition into HDFS using a camus-like
> > application
> > > I feel the need of keeping a Set of all keys read so far and stop as
> soon
> > > as I find a key beeing already in my set. I use this as an indicator of
> > how
> > > far the log compaction has happened already and only pull up to this
> > point.
> > > This works quite well as I do not need to keep the messages but only
> > their
> > > keys in memory.
> > >
> > > The question I want to raise with the community is:
> > >
> > > How do you prevent pulling the same record twice (in different
> versions)
> > > and would it be beneficial if the "OffsetResponse" would also return
> the
> > > last offset that got compacted so far and the application would just
> pull
> > > up to this point?
> > >
> > > Looking forward for some recommendations and comments.
> > >
> > > Best
> > > Jan
> > >
> > >
> >
>


Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-05-02 Thread Jonathan Hodges
Hi Gwen,

As you said I see Bottled Water and Sqoop managing slightly different use
cases so I don't see this feature as a Sqoop killer.  However I did have a
question on your comment that the transaction log or CDC approach will have
problems with very large, very active databases.

I get that you need to have a single producer that transmits the
transaction log changes to Kafka in order.  However on the consumer side
you can have a topic per table and then partition these topics by primary
key to achieve nice parallelism.  So it seems the producer is the potential
bottleneck, but I imagine you can scale that appropriately vertically and
put the proper HA.

Would love to hear your thoughts on this.

Jonathan



On Thu, Apr 30, 2015 at 5:09 PM, Gwen Shapira  wrote:

> I feel a need to respond to the Sqoop-killer comment :)
>
> 1) Note that most databases have a single transaction log per db and in
> order to get the correct view of the DB, you need to read it in order
> (otherwise transactions will get messed up). This means you are limited to
> a single producer reading data from the log, writing it to a single
> partition and getting it read from a single consumer. If the database is
> very large and very active, you may run into some issues there...
>
> Because Sqoop doesn't try to catch up with all the changes, but takes a
> snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
> 10TB databases.
>
> 2) If HDFS is the target of getting data from Postgres, then postgresql ->
> kafka -> HDFS seems less optimal than postgresql -> HDFS directly (in
> parallel). There are good reasons to get Postgres data to Kafka, but if the
> eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.
>
> 3) Due to its parallelism and general purpose JDBC connector, I suspect
> that Sqoop is even a very viable way of getting data into Kafka.
>
> Gwen
>
>
> On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak 
> wrote:
>
> > Hello Everyone,
> >
> > I am quite exited about the recent example of replicating PostgresSQL
> > Changes to Kafka. My view on the log compaction feature always had been a
> > very sceptical one, but now with its great potential exposed to the wide
> > public, I think its an awesome feature. Especially when pulling this data
> > into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank
> > everyone who had the vision of building these kind of systems during a
> time
> > I could not imagine those.
> >
> > There is one open question that I would like people to help me with. When
> > pulling a snapshot of a partition into HDFS using a camus-like
> application
> > I feel the need of keeping a Set of all keys read so far and stop as soon
> > as I find a key beeing already in my set. I use this as an indicator of
> how
> > far the log compaction has happened already and only pull up to this
> point.
> > This works quite well as I do not need to keep the messages but only
> their
> > keys in memory.
> >
> > The question I want to raise with the community is:
> >
> > How do you prevent pulling the same record twice (in different versions)
> > and would it be beneficial if the "OffsetResponse" would also return the
> > last offset that got compacted so far and the application would just pull
> > up to this point?
> >
> > Looking forward for some recommendations and comments.
> >
> > Best
> > Jan
> >
> >
>


Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Gwen Shapira
I feel a need to respond to the Sqoop-killer comment :)

1) Note that most databases have a single transaction log per db and in
order to get the correct view of the DB, you need to read it in order
(otherwise transactions will get messed up). This means you are limited to
a single producer reading data from the log, writing it to a single
partition and getting it read from a single consumer. If the database is
very large and very active, you may run into some issues there...

Because Sqoop doesn't try to catch up with all the changes, but takes a
snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
10TB databases.

2) If HDFS is the target of getting data from Postgres, then postgresql ->
kafka -> HDFS seems less optimal than postgresql -> HDFS directly (in
parallel). There are good reasons to get Postgres data to Kafka, but if the
eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.

3) Due to its parallelism and general purpose JDBC connector, I suspect
that Sqoop is even a very viable way of getting data into Kafka.

Gwen


On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak 
wrote:

> Hello Everyone,
>
> I am quite exited about the recent example of replicating PostgresSQL
> Changes to Kafka. My view on the log compaction feature always had been a
> very sceptical one, but now with its great potential exposed to the wide
> public, I think its an awesome feature. Especially when pulling this data
> into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank
> everyone who had the vision of building these kind of systems during a time
> I could not imagine those.
>
> There is one open question that I would like people to help me with. When
> pulling a snapshot of a partition into HDFS using a camus-like application
> I feel the need of keeping a Set of all keys read so far and stop as soon
> as I find a key beeing already in my set. I use this as an indicator of how
> far the log compaction has happened already and only pull up to this point.
> This works quite well as I do not need to keep the messages but only their
> keys in memory.
>
> The question I want to raise with the community is:
>
> How do you prevent pulling the same record twice (in different versions)
> and would it be beneficial if the "OffsetResponse" would also return the
> last offset that got compacted so far and the application would just pull
> up to this point?
>
> Looking forward for some recommendations and comments.
>
> Best
> Jan
>
>


Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Jan Filipiak

Hello Everyone,

I am quite exited about the recent example of replicating PostgresSQL 
Changes to Kafka. My view on the log compaction feature always had been 
a very sceptical one, but now with its great potential exposed to the 
wide public, I think its an awesome feature. Especially when pulling 
this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want 
to thank everyone who had the vision of building these kind of systems 
during a time I could not imagine those.


There is one open question that I would like people to help me with. 
When pulling a snapshot of a partition into HDFS using a camus-like 
application I feel the need of keeping a Set of all keys read so far and 
stop as soon as I find a key beeing already in my set. I use this as an 
indicator of how far the log compaction has happened already and only 
pull up to this point. This works quite well as I do not need to keep 
the messages but only their keys in memory.


The question I want to raise with the community is:

How do you prevent pulling the same record twice (in different versions) 
and would it be beneficial if the "OffsetResponse" would also return the 
last offset that got compacted so far and the application would just 
pull up to this point?


Looking forward for some recommendations and comments.

Best
Jan



Re: Log compaction recover strategy

2015-03-10 Thread Pierre-Yves Ritschard


On 03/10/2015 05:48 PM, Mayuresh Gharat wrote:
> How do you typically handle workers starting, always start at offset 0
> to make sure the view is correctly recreated ?
> ---> You will have to reset the offsets to 0 and the offset reset policy to
> earliest in consumer.

Yup, as expected.
> 
> How do you handle topology changes in consumers, which lead to a
> redistribution of key across them ?
> ---> Can you explain what exactly do you want to handle here?
>
This is a non-issue, sorry about that.

> Is there a valid mechanism to know the log is being reconsumed and to
> let the client layer know of this ?
> ---> I suppose you will have to maintain this in your application by
> checking keeping track of offset that was consumed in the past and the
> offset currently being consumed.

Thanks Mayuresh!

  - pyr


Re: Log compaction recover strategy

2015-03-10 Thread Mayuresh Gharat
How do you typically handle workers starting, always start at offset 0
to make sure the view is correctly recreated ?
---> You will have to reset the offsets to 0 and the offset reset policy to
earliest in consumer.

How do you handle topology changes in consumers, which lead to a
redistribution of key across them ?
---> Can you explain what exactly do you want to handle here?

Is there a valid mechanism to know the log is being reconsumed and to
let the client layer know of this ?
---> I suppose you will have to maintain this in your application by
checking keeping track of offset that was consumed in the past and the
offset currently being consumed.


Thanks,

Mayuresh

On Tue, Mar 10, 2015 at 5:51 AM, Pierre-Yves Ritschard 
wrote:

> Hi kafka,
>
> I've started implementing simple materialized views with the log
> compaction feature to test it out, and it works great. I'll share the
> code and an accompanying article shortly but first wanted to discuss
> some of the production implications my sandbox has.
>
> I've separated the project in two components:
>
> - An HTTP API which reads off of a memory cache (in this case: redis)
> and produces mutations on a kafka topic
> - A worker which consumes the stream and materializes the view in redis.
>
> I have a single entity so, the materialization is a very simple process,
> which maintains a set of all entity keys and store entity content in
> keys. In redis, a create or update maps to a SADD and SET, a delete maps
> to a SREM and a DEL.
>
> I'm now considering the production implications this has and have a few
> questions:
>
> - How do you typically handle workers starting, always start at offset 0
> to make sure the view is correctly recreated ?
> - How do you handle topology changes in consumers, which lead to a
> redistribution of key across them ?
> - Is there a valid mechanism to know the log is being reconsumed and to
> let the client layer know of this ?
>
> Congrats on getting log compaction in, this feature opens up a ton of
> reliability improvements for us :-)
>
>   - pyr
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Log compaction recover strategy

2015-03-10 Thread Pierre-Yves Ritschard
Hi kafka,

I've started implementing simple materialized views with the log
compaction feature to test it out, and it works great. I'll share the
code and an accompanying article shortly but first wanted to discuss
some of the production implications my sandbox has.

I've separated the project in two components:

- An HTTP API which reads off of a memory cache (in this case: redis)
and produces mutations on a kafka topic
- A worker which consumes the stream and materializes the view in redis.

I have a single entity so, the materialization is a very simple process,
which maintains a set of all entity keys and store entity content in
keys. In redis, a create or update maps to a SADD and SET, a delete maps
to a SREM and a DEL.

I'm now considering the production implications this has and have a few
questions:

- How do you typically handle workers starting, always start at offset 0
to make sure the view is correctly recreated ?
- How do you handle topology changes in consumers, which lead to a
redistribution of key across them ?
- Is there a valid mechanism to know the log is being reconsumed and to
let the client layer know of this ?

Congrats on getting log compaction in, this feature opens up a ton of
reliability improvements for us :-)

  - pyr


  1   2   >