Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-20 Thread Gwen Shapira
Tom,

Documentation improvements are always welcome. The docs are in /docs under
the main repository, just sent a PR for trunk and we are good :)

Segment sizes - I have some objections, but this can be discussed in its
own thread. I feel like I did enough hijacking and Eric may get annoyed at
some point.

Gwen

On Fri, May 20, 2016 at 5:19 AM, Tom Crayford  wrote:

> Hi,
>
> From our perspective (running thousands of Kafka clusters), the main issues
> we see with compacted topics *aren't* disk space usage, or IO utilization
> of the log cleaner.
>
> Size matters a *lot* to the usability of consumers bootstrapping from the
> beginning - in fact we've been debating tuning out the log segment size for
> compacted topics to 100MB, because right now leaving 1GB of uncompacted log
> makes some bootstrapping take way too long (especially for non JVM clients,
> even in fast languages like Go they're not as capable of high throughput as
> the JVM clients). I'm wondering if that should be a default in Kafka itself
> as well, and would be happy to contribute that kind of change upstream.
> Kafka already tunes the __consumer_offsets topic down to 100MB per segment
> for this exact reason.
>
> Secondly, the docs don't make it clear (and this has confused dozens of
> well intentioned, smart folk that we've talked to, and likely thousands of
> Kafka users across the board) that compaction is an *alternative* to time
> based retention. Lots of folk used compaction assuming "it's like time
> based retention, but with even less space usage". Switching between the two
> is thankfully easy, but it's been a very confusing thing to understand. I'd
> like to contribute back clearer docs to Kafka about this. Should I send a
> PR? Would that be welcome?
>
> Thirdly, most users *don't* want to tune Kafka's settings at all, or even
> know how or when they should. Whilst some amount of tuning is inevitable,
> the drive Gwen has towards "less tuning" is very positive from our
> perspective. Most users of most software (including technical users of data
> storage and messaging systems) want to "just use it" and not worry about
> "do I need to monitor a thousand things and then tune another thousand
> based on my metrics". Whilst some of that is unavoidable (for sure), it
> feels like compaction tuning should be something the project provides
> *great* general purpose defaults for most users, which cover most of the
> cases, which leave tuning just to the 1% of folk who really really care.
> The current defaults seem to be doing well here (barring the above note
> about log compaction size), and any future changes here should keep this
> up.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Fri, May 20, 2016 at 4:48 AM, Jay Kreps  wrote:
>
> > Hey Gwen,
> >
> > Yeah specifying in bytes versus the utilization percent would have been
> > easier to implement. The argument against that is that basically users
> are
> > super terrible at predicting and updating data sizes as stuff grows and
> > you'd have to really set this then for each individual log perhaps?
> > Currently I think that the utilization number of 50% is pretty reasonable
> > for most people and you only need to tune it if you really want to
> > optimize. But if you set a fixed size compaction threshold in bytes then
> > how aggressive this is and the resulting utilization totally depends on
> the
> > compacted size of the data in the topic. i.e. if it defaults to 20GB then
> > that becomes the minimum size of the log, so if you end up with a bunch
> of
> > topics with 100mb of compacted data they all end up growing to 20GB. As a
> > user if you think you've written 100*100mb worth of compacted partitions
> > but Kafka has 100*20GB of data I think you'd be a bit shocked.
> >
> > Ben--I think your proposal attempts to minimize total I/O by waiting
> until
> > the compaction buffer will be maxed out. Each unique key in the
> uncompacted
> > log uses 24 bytes of compaction buffer iirc but since you don't know the
> > number of unique keys it's a bit hard to guess this. You could assume
> they
> > are all unique and only compact when you have N/24 messages in the
> > uncompacted log where N is the compaction buffer size in bytes. The issue
> > as with Gwen's proposal is that by doing this you really lose control of
> > disk utilization which might be a bit unintuitive. Your idea of just
> using
> > the free disk space might fix this though it might be somewhat complex in
> > the mixed setting with both compacted and non-compacted topics.
> >
> > One other thing worth noting is that compaction isn't just for disk
> space.
> > A consumer that bootstraps from the beginning (a la state restore in
> Kafka
> > Streams) has to fully read and process the whole log so I think you want
> to
> > compact even when you still have free space.
> >
> > -Jay
> >
> >
> >
> > On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira 
> wrote:
> >
> > > Oops :)
> > >
> > > The docs are definitely not doing th

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-20 Thread Tom Crayford
Hi,

>From our perspective (running thousands of Kafka clusters), the main issues
we see with compacted topics *aren't* disk space usage, or IO utilization
of the log cleaner.

Size matters a *lot* to the usability of consumers bootstrapping from the
beginning - in fact we've been debating tuning out the log segment size for
compacted topics to 100MB, because right now leaving 1GB of uncompacted log
makes some bootstrapping take way too long (especially for non JVM clients,
even in fast languages like Go they're not as capable of high throughput as
the JVM clients). I'm wondering if that should be a default in Kafka itself
as well, and would be happy to contribute that kind of change upstream.
Kafka already tunes the __consumer_offsets topic down to 100MB per segment
for this exact reason.

Secondly, the docs don't make it clear (and this has confused dozens of
well intentioned, smart folk that we've talked to, and likely thousands of
Kafka users across the board) that compaction is an *alternative* to time
based retention. Lots of folk used compaction assuming "it's like time
based retention, but with even less space usage". Switching between the two
is thankfully easy, but it's been a very confusing thing to understand. I'd
like to contribute back clearer docs to Kafka about this. Should I send a
PR? Would that be welcome?

Thirdly, most users *don't* want to tune Kafka's settings at all, or even
know how or when they should. Whilst some amount of tuning is inevitable,
the drive Gwen has towards "less tuning" is very positive from our
perspective. Most users of most software (including technical users of data
storage and messaging systems) want to "just use it" and not worry about
"do I need to monitor a thousand things and then tune another thousand
based on my metrics". Whilst some of that is unavoidable (for sure), it
feels like compaction tuning should be something the project provides
*great* general purpose defaults for most users, which cover most of the
cases, which leave tuning just to the 1% of folk who really really care.
The current defaults seem to be doing well here (barring the above note
about log compaction size), and any future changes here should keep this up.

Thanks

Tom Crayford
Heroku Kafka

On Fri, May 20, 2016 at 4:48 AM, Jay Kreps  wrote:

> Hey Gwen,
>
> Yeah specifying in bytes versus the utilization percent would have been
> easier to implement. The argument against that is that basically users are
> super terrible at predicting and updating data sizes as stuff grows and
> you'd have to really set this then for each individual log perhaps?
> Currently I think that the utilization number of 50% is pretty reasonable
> for most people and you only need to tune it if you really want to
> optimize. But if you set a fixed size compaction threshold in bytes then
> how aggressive this is and the resulting utilization totally depends on the
> compacted size of the data in the topic. i.e. if it defaults to 20GB then
> that becomes the minimum size of the log, so if you end up with a bunch of
> topics with 100mb of compacted data they all end up growing to 20GB. As a
> user if you think you've written 100*100mb worth of compacted partitions
> but Kafka has 100*20GB of data I think you'd be a bit shocked.
>
> Ben--I think your proposal attempts to minimize total I/O by waiting until
> the compaction buffer will be maxed out. Each unique key in the uncompacted
> log uses 24 bytes of compaction buffer iirc but since you don't know the
> number of unique keys it's a bit hard to guess this. You could assume they
> are all unique and only compact when you have N/24 messages in the
> uncompacted log where N is the compaction buffer size in bytes. The issue
> as with Gwen's proposal is that by doing this you really lose control of
> disk utilization which might be a bit unintuitive. Your idea of just using
> the free disk space might fix this though it might be somewhat complex in
> the mixed setting with both compacted and non-compacted topics.
>
> One other thing worth noting is that compaction isn't just for disk space.
> A consumer that bootstraps from the beginning (a la state restore in Kafka
> Streams) has to fully read and process the whole log so I think you want to
> compact even when you still have free space.
>
> -Jay
>
>
>
> On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira  wrote:
>
> > Oops :)
> >
> > The docs are definitely not doing the feature any favors, but I didn't
> mean
> > to imply the feature is thoughtless.
> >
> > Here's the thing I'm not getting: You are trading off disk space for IO
> > efficiency. Thats reasonable. But why not allow users to specify space in
> > bytes?
> >
> > Basically tell the LogCompacter: Once I have X bytes of dirty data (or
> post
> > KIP-58, X bytes of data that needs cleaning), please compact it to the
> best
> > of your ability (which in steady state will be into almost nothing).
> >
> > Since we know how big the compaction buffer is and how 

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-19 Thread Jay Kreps
Hey Gwen,

Yeah specifying in bytes versus the utilization percent would have been
easier to implement. The argument against that is that basically users are
super terrible at predicting and updating data sizes as stuff grows and
you'd have to really set this then for each individual log perhaps?
Currently I think that the utilization number of 50% is pretty reasonable
for most people and you only need to tune it if you really want to
optimize. But if you set a fixed size compaction threshold in bytes then
how aggressive this is and the resulting utilization totally depends on the
compacted size of the data in the topic. i.e. if it defaults to 20GB then
that becomes the minimum size of the log, so if you end up with a bunch of
topics with 100mb of compacted data they all end up growing to 20GB. As a
user if you think you've written 100*100mb worth of compacted partitions
but Kafka has 100*20GB of data I think you'd be a bit shocked.

Ben--I think your proposal attempts to minimize total I/O by waiting until
the compaction buffer will be maxed out. Each unique key in the uncompacted
log uses 24 bytes of compaction buffer iirc but since you don't know the
number of unique keys it's a bit hard to guess this. You could assume they
are all unique and only compact when you have N/24 messages in the
uncompacted log where N is the compaction buffer size in bytes. The issue
as with Gwen's proposal is that by doing this you really lose control of
disk utilization which might be a bit unintuitive. Your idea of just using
the free disk space might fix this though it might be somewhat complex in
the mixed setting with both compacted and non-compacted topics.

One other thing worth noting is that compaction isn't just for disk space.
A consumer that bootstraps from the beginning (a la state restore in Kafka
Streams) has to fully read and process the whole log so I think you want to
compact even when you still have free space.

-Jay



On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira  wrote:

> Oops :)
>
> The docs are definitely not doing the feature any favors, but I didn't mean
> to imply the feature is thoughtless.
>
> Here's the thing I'm not getting: You are trading off disk space for IO
> efficiency. Thats reasonable. But why not allow users to specify space in
> bytes?
>
> Basically tell the LogCompacter: Once I have X bytes of dirty data (or post
> KIP-58, X bytes of data that needs cleaning), please compact it to the best
> of your ability (which in steady state will be into almost nothing).
>
> Since we know how big the compaction buffer is and how Kafka uses it, we
> can exactly calculate how much space we are wasting vs. how much IO we are
> going to do per unit of time. The size of a single segment or compaction
> buffer (whichever is bigger) can be a good default value for
> min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> amount of free space on the disk. Heck, we can automate those tunings
> (lower min.dirty.bytes to trigger compaction and free space if we are close
> to running out of space).
>
> We can do the same capacity planning with percentages but it requires more
> information to know the results, information that can only be acquired
> after you reach steady state.
>
> It is a bit obvious, so I'm guessing the idea was considered and dismissed.
> I just can't see why.
> If only there were KIPs back then, so I could look at rejected
> alternatives...
>
> Gwen
>
>
>
> On Wed, May 18, 2016 at 9:54 PM, Jay Kreps  wrote:
>
> > So in summary we never considered this a mechanism to give the consumer
> > time to consume prior to compaction, just a mechanism to control space
> > wastage. It sort of accidentally gives you that but it's super hard to
> > reason about it as an SLA since it is relative to the log size rather
> than
> > absolute.
> >
> > -Jay
> >
> > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps  wrote:
> >
> > > The sad part is I actually did think pretty hard about how to configure
> > > that stuff so I guess *I* think the config makes sense! Clearly trying
> to
> > > prevent my being shot :-)
> > >
> > > I agree the name could be improved and the documentation is quite
> > > spartan--no guidance at all on how to set it or what it trades off. A
> bit
> > > shameful.
> > >
> > > The thinking was this. One approach to cleaning would be to just do it
> > > continually with the idea that, hey, you can't take that I/O with
> > you--once
> > > you've budgeted N MB/sec of background I/O for compaction some of the
> > time,
> > > you might as well just use that budget all the time. But this leads to
> > > seemingly silly behavior where you are doing big ass compactions all
> the
> > > time to free up just a few bytes and we thought it would freak people
> > out.
> > > Plus arguably Kafka usage isn't all in steady state so this wastage
> would
> > > come out of the budget for other bursty stuff.
> > >
> > >  So when should compaction kick in? Well what are you trading off? The
> > >

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-19 Thread Gwen Shapira
No, you are right that mapping dirty-bytes to dirty-map sizes is
non-trivial. I think it would be good to discuss an alternative approach,
but this is probably the wrong thread :)

On Thu, May 19, 2016 at 4:36 AM, Ben Stopford  wrote:

> Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree
> that a more holistic solution, which tuned itself to total disk
> availability, might be quite useful :)
>
> If we took the min.dirty.bytes route, and defaulted it to the segment
> size, that would work well for distributions where the dirty-map
> (compaction buffer) will be filled by a single dirty segment, but this
> would depend a bit on the message size. If messages were large the
> dirty-map might not fill, which would reduce the yield from the scan. In
> fact there seems a general incentive to defer scanning to ensure the
> dirty-map always fills. For this reason, the ratio approach still seems a
> little more general to me as it applies equally to large and small
> partitions.
>
> Let me know if I’m missing something here.
>
> B
>
>
> > On 19 May 2016, at 06:29, Gwen Shapira  wrote:
> >
> > Oops :)
> >
> > The docs are definitely not doing the feature any favors, but I didn't
> mean
> > to imply the feature is thoughtless.
> >
> > Here's the thing I'm not getting: You are trading off disk space for IO
> > efficiency. Thats reasonable. But why not allow users to specify space in
> > bytes?
> >
> > Basically tell the LogCompacter: Once I have X bytes of dirty data (or
> post
> > KIP-58, X bytes of data that needs cleaning), please compact it to the
> best
> > of your ability (which in steady state will be into almost nothing).
> >
> > Since we know how big the compaction buffer is and how Kafka uses it, we
> > can exactly calculate how much space we are wasting vs. how much IO we
> are
> > going to do per unit of time. The size of a single segment or compaction
> > buffer (whichever is bigger) can be a good default value for
> > min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> > amount of free space on the disk. Heck, we can automate those tunings
> > (lower min.dirty.bytes to trigger compaction and free space if we are
> close
> > to running out of space).
> >
> > We can do the same capacity planning with percentages but it requires
> more
> > information to know the results, information that can only be acquired
> > after you reach steady state.
> >
> > It is a bit obvious, so I'm guessing the idea was considered and
> dismissed.
> > I just can't see why.
> > If only there were KIPs back then, so I could look at rejected
> > alternatives...
> >
> > Gwen
> >
> >
> >
> > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps  wrote:
> >
> >> So in summary we never considered this a mechanism to give the consumer
> >> time to consume prior to compaction, just a mechanism to control space
> >> wastage. It sort of accidentally gives you that but it's super hard to
> >> reason about it as an SLA since it is relative to the log size rather
> than
> >> absolute.
> >>
> >> -Jay
> >>
> >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps  wrote:
> >>
> >>> The sad part is I actually did think pretty hard about how to configure
> >>> that stuff so I guess *I* think the config makes sense! Clearly trying
> to
> >>> prevent my being shot :-)
> >>>
> >>> I agree the name could be improved and the documentation is quite
> >>> spartan--no guidance at all on how to set it or what it trades off. A
> bit
> >>> shameful.
> >>>
> >>> The thinking was this. One approach to cleaning would be to just do it
> >>> continually with the idea that, hey, you can't take that I/O with
> >> you--once
> >>> you've budgeted N MB/sec of background I/O for compaction some of the
> >> time,
> >>> you might as well just use that budget all the time. But this leads to
> >>> seemingly silly behavior where you are doing big ass compactions all
> the
> >>> time to free up just a few bytes and we thought it would freak people
> >> out.
> >>> Plus arguably Kafka usage isn't all in steady state so this wastage
> would
> >>> come out of the budget for other bursty stuff.
> >>>
> >>> So when should compaction kick in? Well what are you trading off? The
> >>> tradeoff here is how much space to waste on disk versus how much I/O to
> >> use
> >>> in cleaning. In general we can't say exactly how much space a
> compaction
> >>> will free up--during a phase of all "inserts" compaction may free up no
> >>> space at all. You just have to do the compaction and hope for the best.
> >> But
> >>> in general for most compacted topics they should soon reach a "steady
> >>> state" where they aren't growing or growing very slowly, so most writes
> >> are
> >>> updates (if they keep growing rapidly indefinitely then you are going
> to
> >>> run out of space--so safe to assume they do reach steady state). In
> this
> >>> steady state the ratio of uncompacted log to total log is effectively
> the
> >>> utilization (wasted space percentage). So if you set 

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-19 Thread Ben Stopford
Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree that a 
more holistic solution, which tuned itself to total disk availability, might be 
quite useful :)

If we took the min.dirty.bytes route, and defaulted it to the segment size, 
that would work well for distributions where the dirty-map (compaction buffer) 
will be filled by a single dirty segment, but this would depend a bit on the 
message size. If messages were large the dirty-map might not fill, which would 
reduce the yield from the scan. In fact there seems a general incentive to 
defer scanning to ensure the dirty-map always fills. For this reason, the ratio 
approach still seems a little more general to me as it applies equally to large 
and small partitions. 

Let me know if I’m missing something here. 

B


> On 19 May 2016, at 06:29, Gwen Shapira  wrote:
> 
> Oops :)
> 
> The docs are definitely not doing the feature any favors, but I didn't mean
> to imply the feature is thoughtless.
> 
> Here's the thing I'm not getting: You are trading off disk space for IO
> efficiency. Thats reasonable. But why not allow users to specify space in
> bytes?
> 
> Basically tell the LogCompacter: Once I have X bytes of dirty data (or post
> KIP-58, X bytes of data that needs cleaning), please compact it to the best
> of your ability (which in steady state will be into almost nothing).
> 
> Since we know how big the compaction buffer is and how Kafka uses it, we
> can exactly calculate how much space we are wasting vs. how much IO we are
> going to do per unit of time. The size of a single segment or compaction
> buffer (whichever is bigger) can be a good default value for
> min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> amount of free space on the disk. Heck, we can automate those tunings
> (lower min.dirty.bytes to trigger compaction and free space if we are close
> to running out of space).
> 
> We can do the same capacity planning with percentages but it requires more
> information to know the results, information that can only be acquired
> after you reach steady state.
> 
> It is a bit obvious, so I'm guessing the idea was considered and dismissed.
> I just can't see why.
> If only there were KIPs back then, so I could look at rejected
> alternatives...
> 
> Gwen
> 
> 
> 
> On Wed, May 18, 2016 at 9:54 PM, Jay Kreps  wrote:
> 
>> So in summary we never considered this a mechanism to give the consumer
>> time to consume prior to compaction, just a mechanism to control space
>> wastage. It sort of accidentally gives you that but it's super hard to
>> reason about it as an SLA since it is relative to the log size rather than
>> absolute.
>> 
>> -Jay
>> 
>> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps  wrote:
>> 
>>> The sad part is I actually did think pretty hard about how to configure
>>> that stuff so I guess *I* think the config makes sense! Clearly trying to
>>> prevent my being shot :-)
>>> 
>>> I agree the name could be improved and the documentation is quite
>>> spartan--no guidance at all on how to set it or what it trades off. A bit
>>> shameful.
>>> 
>>> The thinking was this. One approach to cleaning would be to just do it
>>> continually with the idea that, hey, you can't take that I/O with
>> you--once
>>> you've budgeted N MB/sec of background I/O for compaction some of the
>> time,
>>> you might as well just use that budget all the time. But this leads to
>>> seemingly silly behavior where you are doing big ass compactions all the
>>> time to free up just a few bytes and we thought it would freak people
>> out.
>>> Plus arguably Kafka usage isn't all in steady state so this wastage would
>>> come out of the budget for other bursty stuff.
>>> 
>>> So when should compaction kick in? Well what are you trading off? The
>>> tradeoff here is how much space to waste on disk versus how much I/O to
>> use
>>> in cleaning. In general we can't say exactly how much space a compaction
>>> will free up--during a phase of all "inserts" compaction may free up no
>>> space at all. You just have to do the compaction and hope for the best.
>> But
>>> in general for most compacted topics they should soon reach a "steady
>>> state" where they aren't growing or growing very slowly, so most writes
>> are
>>> updates (if they keep growing rapidly indefinitely then you are going to
>>> run out of space--so safe to assume they do reach steady state). In this
>>> steady state the ratio of uncompacted log to total log is effectively the
>>> utilization (wasted space percentage). So if you set it to 50% your data
>> is
>>> about half duplicates. By tolerating more uncleaned log you get more bang
>>> for your compaction I/O buck but more space wastage. This seemed like a
>>> reasonable way to think about it because maybe you know your compacted
>> data
>>> size (roughly) so you can reason about whether using, say, twice that
>> space
>>> is okay.
>>> 
>>> Maybe we should just change the name to something about target
>> util

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-18 Thread Gwen Shapira
Oops :)

The docs are definitely not doing the feature any favors, but I didn't mean
to imply the feature is thoughtless.

Here's the thing I'm not getting: You are trading off disk space for IO
efficiency. Thats reasonable. But why not allow users to specify space in
bytes?

Basically tell the LogCompacter: Once I have X bytes of dirty data (or post
KIP-58, X bytes of data that needs cleaning), please compact it to the best
of your ability (which in steady state will be into almost nothing).

Since we know how big the compaction buffer is and how Kafka uses it, we
can exactly calculate how much space we are wasting vs. how much IO we are
going to do per unit of time. The size of a single segment or compaction
buffer (whichever is bigger) can be a good default value for
min.dirty.bytes. We can even evaluate and re-evaluate it based on the
amount of free space on the disk. Heck, we can automate those tunings
(lower min.dirty.bytes to trigger compaction and free space if we are close
to running out of space).

We can do the same capacity planning with percentages but it requires more
information to know the results, information that can only be acquired
after you reach steady state.

It is a bit obvious, so I'm guessing the idea was considered and dismissed.
I just can't see why.
If only there were KIPs back then, so I could look at rejected
alternatives...

Gwen



On Wed, May 18, 2016 at 9:54 PM, Jay Kreps  wrote:

> So in summary we never considered this a mechanism to give the consumer
> time to consume prior to compaction, just a mechanism to control space
> wastage. It sort of accidentally gives you that but it's super hard to
> reason about it as an SLA since it is relative to the log size rather than
> absolute.
>
> -Jay
>
> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps  wrote:
>
> > The sad part is I actually did think pretty hard about how to configure
> > that stuff so I guess *I* think the config makes sense! Clearly trying to
> > prevent my being shot :-)
> >
> > I agree the name could be improved and the documentation is quite
> > spartan--no guidance at all on how to set it or what it trades off. A bit
> > shameful.
> >
> > The thinking was this. One approach to cleaning would be to just do it
> > continually with the idea that, hey, you can't take that I/O with
> you--once
> > you've budgeted N MB/sec of background I/O for compaction some of the
> time,
> > you might as well just use that budget all the time. But this leads to
> > seemingly silly behavior where you are doing big ass compactions all the
> > time to free up just a few bytes and we thought it would freak people
> out.
> > Plus arguably Kafka usage isn't all in steady state so this wastage would
> > come out of the budget for other bursty stuff.
> >
> >  So when should compaction kick in? Well what are you trading off? The
> > tradeoff here is how much space to waste on disk versus how much I/O to
> use
> > in cleaning. In general we can't say exactly how much space a compaction
> > will free up--during a phase of all "inserts" compaction may free up no
> > space at all. You just have to do the compaction and hope for the best.
> But
> > in general for most compacted topics they should soon reach a "steady
> > state" where they aren't growing or growing very slowly, so most writes
> are
> > updates (if they keep growing rapidly indefinitely then you are going to
> > run out of space--so safe to assume they do reach steady state). In this
> > steady state the ratio of uncompacted log to total log is effectively the
> > utilization (wasted space percentage). So if you set it to 50% your data
> is
> > about half duplicates. By tolerating more uncleaned log you get more bang
> > for your compaction I/O buck but more space wastage. This seemed like a
> > reasonable way to think about it because maybe you know your compacted
> data
> > size (roughly) so you can reason about whether using, say, twice that
> space
> > is okay.
> >
> > Maybe we should just change the name to something about target
> utilization
> > even though that isn't strictly true except in steady state?
> >
> > -Jay
> >
> >
> > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira  wrote:
> >
> >> Interesting!
> >>
> >> This needs to be double checked by someone with more experience, but
> >> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
> >> controls *just* the second property, and I'm not even convinced about
> >> that.
> >>
> >> Few facts:
> >>
> >> 1. Each cleaner thread cleans one log at a time. It always goes for
> >> the log with the largest percentage of non-compacted bytes. If you
> >> just created a new partition, wrote 1G and switched to a new segment,
> >> it is very likely that this will be the next log to compact.
> >> Explaining the behavior Eric and Jay complained about. I expected it
> >> to be rare.
> >>
> >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
> >> min.cleanable is), it will be skipped, knowing that others hav

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-18 Thread Jay Kreps
So in summary we never considered this a mechanism to give the consumer
time to consume prior to compaction, just a mechanism to control space
wastage. It sort of accidentally gives you that but it's super hard to
reason about it as an SLA since it is relative to the log size rather than
absolute.

-Jay

On Wed, May 18, 2016 at 9:50 PM, Jay Kreps  wrote:

> The sad part is I actually did think pretty hard about how to configure
> that stuff so I guess *I* think the config makes sense! Clearly trying to
> prevent my being shot :-)
>
> I agree the name could be improved and the documentation is quite
> spartan--no guidance at all on how to set it or what it trades off. A bit
> shameful.
>
> The thinking was this. One approach to cleaning would be to just do it
> continually with the idea that, hey, you can't take that I/O with you--once
> you've budgeted N MB/sec of background I/O for compaction some of the time,
> you might as well just use that budget all the time. But this leads to
> seemingly silly behavior where you are doing big ass compactions all the
> time to free up just a few bytes and we thought it would freak people out.
> Plus arguably Kafka usage isn't all in steady state so this wastage would
> come out of the budget for other bursty stuff.
>
>  So when should compaction kick in? Well what are you trading off? The
> tradeoff here is how much space to waste on disk versus how much I/O to use
> in cleaning. In general we can't say exactly how much space a compaction
> will free up--during a phase of all "inserts" compaction may free up no
> space at all. You just have to do the compaction and hope for the best. But
> in general for most compacted topics they should soon reach a "steady
> state" where they aren't growing or growing very slowly, so most writes are
> updates (if they keep growing rapidly indefinitely then you are going to
> run out of space--so safe to assume they do reach steady state). In this
> steady state the ratio of uncompacted log to total log is effectively the
> utilization (wasted space percentage). So if you set it to 50% your data is
> about half duplicates. By tolerating more uncleaned log you get more bang
> for your compaction I/O buck but more space wastage. This seemed like a
> reasonable way to think about it because maybe you know your compacted data
> size (roughly) so you can reason about whether using, say, twice that space
> is okay.
>
> Maybe we should just change the name to something about target utilization
> even though that isn't strictly true except in steady state?
>
> -Jay
>
>
> On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira  wrote:
>
>> Interesting!
>>
>> This needs to be double checked by someone with more experience, but
>> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
>> controls *just* the second property, and I'm not even convinced about
>> that.
>>
>> Few facts:
>>
>> 1. Each cleaner thread cleans one log at a time. It always goes for
>> the log with the largest percentage of non-compacted bytes. If you
>> just created a new partition, wrote 1G and switched to a new segment,
>> it is very likely that this will be the next log to compact.
>> Explaining the behavior Eric and Jay complained about. I expected it
>> to be rare.
>>
>> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
>> min.cleanable is), it will be skipped, knowing that others have even
>> lower ditry ratio.
>>
>> 3. If we do decide to clean a log, we will clean the whole damn thing,
>> leaving only the active segment. Contrary to my expectations, it does
>> not leave any dirty byte behind. So *at most* you will have a single
>> clean segment. Again, explaining why Jay, James and Eric are unhappy.
>>
>> 4. What is does guarantee (kinda? at least I think it tries?) is to
>> always clean a large "chunk" of data at once, hopefully minimizing
>> churn (cleaning small bits off the same log over and over) and
>> minimizing IO. It does have the nice mathematical property of
>> guaranteeing double the amount of time between cleanings (except it
>> doesn't really, because who knows the size of the compacted region).
>>
>> 5. Whoever wrote the docs should be shot :)
>>
>> so, in conclusion:
>> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
>> difficult to understand, and IMO doesn't even do what it should do.
>> I would like to consider the possibility of
>> min.cleanable.dirty.bytes, which should give good control over # of IO
>> operations (since the size of compaction buffer is known).
>>
>> In the context of this KIP, the interaction with cleanable ratio and
>> cleanable bytes will be similar, and it looks like it was already done
>> correctly in the PR, so no worries ("the ratio's definition will be
>> expanded to become the ratio of "compactable" to compactable plus
>> compacted message sizes. Where compactable includes log segments that
>> are neither the active segment nor those prohibited from being
>> compacted because they

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-18 Thread Jay Kreps
The sad part is I actually did think pretty hard about how to configure
that stuff so I guess *I* think the config makes sense! Clearly trying to
prevent my being shot :-)

I agree the name could be improved and the documentation is quite
spartan--no guidance at all on how to set it or what it trades off. A bit
shameful.

The thinking was this. One approach to cleaning would be to just do it
continually with the idea that, hey, you can't take that I/O with you--once
you've budgeted N MB/sec of background I/O for compaction some of the time,
you might as well just use that budget all the time. But this leads to
seemingly silly behavior where you are doing big ass compactions all the
time to free up just a few bytes and we thought it would freak people out.
Plus arguably Kafka usage isn't all in steady state so this wastage would
come out of the budget for other bursty stuff.

 So when should compaction kick in? Well what are you trading off? The
tradeoff here is how much space to waste on disk versus how much I/O to use
in cleaning. In general we can't say exactly how much space a compaction
will free up--during a phase of all "inserts" compaction may free up no
space at all. You just have to do the compaction and hope for the best. But
in general for most compacted topics they should soon reach a "steady
state" where they aren't growing or growing very slowly, so most writes are
updates (if they keep growing rapidly indefinitely then you are going to
run out of space--so safe to assume they do reach steady state). In this
steady state the ratio of uncompacted log to total log is effectively the
utilization (wasted space percentage). So if you set it to 50% your data is
about half duplicates. By tolerating more uncleaned log you get more bang
for your compaction I/O buck but more space wastage. This seemed like a
reasonable way to think about it because maybe you know your compacted data
size (roughly) so you can reason about whether using, say, twice that space
is okay.

Maybe we should just change the name to something about target utilization
even though that isn't strictly true except in steady state?

-Jay


On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira  wrote:

> Interesting!
>
> This needs to be double checked by someone with more experience, but
> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
> controls *just* the second property, and I'm not even convinced about
> that.
>
> Few facts:
>
> 1. Each cleaner thread cleans one log at a time. It always goes for
> the log with the largest percentage of non-compacted bytes. If you
> just created a new partition, wrote 1G and switched to a new segment,
> it is very likely that this will be the next log to compact.
> Explaining the behavior Eric and Jay complained about. I expected it
> to be rare.
>
> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
> min.cleanable is), it will be skipped, knowing that others have even
> lower ditry ratio.
>
> 3. If we do decide to clean a log, we will clean the whole damn thing,
> leaving only the active segment. Contrary to my expectations, it does
> not leave any dirty byte behind. So *at most* you will have a single
> clean segment. Again, explaining why Jay, James and Eric are unhappy.
>
> 4. What is does guarantee (kinda? at least I think it tries?) is to
> always clean a large "chunk" of data at once, hopefully minimizing
> churn (cleaning small bits off the same log over and over) and
> minimizing IO. It does have the nice mathematical property of
> guaranteeing double the amount of time between cleanings (except it
> doesn't really, because who knows the size of the compacted region).
>
> 5. Whoever wrote the docs should be shot :)
>
> so, in conclusion:
> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
> difficult to understand, and IMO doesn't even do what it should do.
> I would like to consider the possibility of
> min.cleanable.dirty.bytes, which should give good control over # of IO
> operations (since the size of compaction buffer is known).
>
> In the context of this KIP, the interaction with cleanable ratio and
> cleanable bytes will be similar, and it looks like it was already done
> correctly in the PR, so no worries ("the ratio's definition will be
> expanded to become the ratio of "compactable" to compactable plus
> compacted message sizes. Where compactable includes log segments that
> are neither the active segment nor those prohibited from being
> compacted because they contain messages that do not satisfy all the
> new lag constraints"
>
> I may open a new KIP to handle the cleanable ratio. Please don't let
> my confusion detract from this KIP.
>
> Gwen
>
> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford  wrote:
> > Generally, this seems like a sensible proposal to me.
> >
> > Regarding (1): time and message count seem sensible. I can’t think of a
> specific use case for bytes but it seems like there could be one.
> >
> > Regarding (2):
> > Th

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-18 Thread Gwen Shapira
Interesting!

This needs to be double checked by someone with more experience, but
reading the code, it looks like "log.cleaner.min.cleanable.ratio"
controls *just* the second property, and I'm not even convinced about
that.

Few facts:

1. Each cleaner thread cleans one log at a time. It always goes for
the log with the largest percentage of non-compacted bytes. If you
just created a new partition, wrote 1G and switched to a new segment,
it is very likely that this will be the next log to compact.
Explaining the behavior Eric and Jay complained about. I expected it
to be rare.

2. If the dirtiest log has less than 50% dirty bytes (or whatever
min.cleanable is), it will be skipped, knowing that others have even
lower ditry ratio.

3. If we do decide to clean a log, we will clean the whole damn thing,
leaving only the active segment. Contrary to my expectations, it does
not leave any dirty byte behind. So *at most* you will have a single
clean segment. Again, explaining why Jay, James and Eric are unhappy.

4. What is does guarantee (kinda? at least I think it tries?) is to
always clean a large "chunk" of data at once, hopefully minimizing
churn (cleaning small bits off the same log over and over) and
minimizing IO. It does have the nice mathematical property of
guaranteeing double the amount of time between cleanings (except it
doesn't really, because who knows the size of the compacted region).

5. Whoever wrote the docs should be shot :)

so, in conclusion:
In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
difficult to understand, and IMO doesn't even do what it should do.
I would like to consider the possibility of
min.cleanable.dirty.bytes, which should give good control over # of IO
operations (since the size of compaction buffer is known).

In the context of this KIP, the interaction with cleanable ratio and
cleanable bytes will be similar, and it looks like it was already done
correctly in the PR, so no worries ("the ratio's definition will be
expanded to become the ratio of "compactable" to compactable plus
compacted message sizes. Where compactable includes log segments that
are neither the active segment nor those prohibited from being
compacted because they contain messages that do not satisfy all the
new lag constraints"

I may open a new KIP to handle the cleanable ratio. Please don't let
my confusion detract from this KIP.

Gwen

On Wed, May 18, 2016 at 3:41 PM, Ben Stopford  wrote:
> Generally, this seems like a sensible proposal to me.
>
> Regarding (1): time and message count seem sensible. I can’t think of a 
> specific use case for bytes but it seems like there could be one.
>
> Regarding (2):
> The setting log.cleaner.min.cleanable.ratio currently seems to have two uses. 
> It controls which messages will not be compacted, but it also provides a 
> fractional bound on how many logs are cleaned (and hence work done) in each 
> round. This new proposal seems aimed at the first use, but not the second.
>
> The second case better suits a fractional setting like the one we have now. 
> Using a fractional value means the amount of data cleaned scales in 
> proportion to the data stored in the log. If we were to replace this with an 
> absolute value it would create proportionally more cleaning work as the log 
> grew in size.
>
> So, if I understand this correctly, I think there is an argument for having 
> both.
>
>
>> On 17 May 2016, at 19:43, Gwen Shapira  wrote:
>>
>>  and Spark's implementation is another good reason to allow compaction 
>> lag.
>>
>> I'm convinced :)
>>
>> We need to decide:
>>
>> 1) Do we need just .ms config, or anything else? consumer lag is
>> measured (and monitored) in messages, so if we need this feature to
>> somehow work in tandem with consumer lag monitoring, I think we need
>> .messages too.
>>
>> 2) Does this new configuration allows us to get rid of cleaner.ratio config?
>>
>> Gwen
>>
>>
>> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
>>  wrote:
>>> James,
>>>
>>> Your pictures do an excellent job of illustrating my point.
>>>
>>> My mention of the additional "10's of minutes to hours" refers to how far 
>>> after the original target checkpoint (T1 in your diagram) on may need to go 
>>> to get to a checkpoint where all partitions of all topics are in the 
>>> uncompacted region of their respective logs. In terms of your diagram: the 
>>> T3 transaction could have been written 10's of minutes to hours after T1 as 
>>> that was how much time it took all readers to get to T1.
>>>
 You would not have to start over from the beginning in order to read to T3.
>>>
>>> While I agree this is technically true, in practice it could be very 
>>> onerous to actually do it. For example, we use the Kafka consumer that is 
>>> part of the Spark Streaming library to read table topics. It accepts a 
>>> range of offsets to read for each partition. Say we originally target 
>>> ranges from offset 0 to the offset of T1 for each topic+partition. Th

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-18 Thread Ben Stopford
Generally, this seems like a sensible proposal to me. 

Regarding (1): time and message count seem sensible. I can’t think of a 
specific use case for bytes but it seems like there could be one.  

Regarding (2): 
The setting log.cleaner.min.cleanable.ratio currently seems to have two uses. 
It controls which messages will not be compacted, but it also provides a 
fractional bound on how many logs are cleaned (and hence work done) in each 
round. This new proposal seems aimed at the first use, but not the second. 

The second case better suits a fractional setting like the one we have now. 
Using a fractional value means the amount of data cleaned scales in proportion 
to the data stored in the log. If we were to replace this with an absolute 
value it would create proportionally more cleaning work as the log grew in 
size. 

So, if I understand this correctly, I think there is an argument for having 
both. 


> On 17 May 2016, at 19:43, Gwen Shapira  wrote:
> 
>  and Spark's implementation is another good reason to allow compaction 
> lag.
> 
> I'm convinced :)
> 
> We need to decide:
> 
> 1) Do we need just .ms config, or anything else? consumer lag is
> measured (and monitored) in messages, so if we need this feature to
> somehow work in tandem with consumer lag monitoring, I think we need
> .messages too.
> 
> 2) Does this new configuration allows us to get rid of cleaner.ratio config?
> 
> Gwen
> 
> 
> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
>  wrote:
>> James,
>> 
>> Your pictures do an excellent job of illustrating my point.
>> 
>> My mention of the additional "10's of minutes to hours" refers to how far 
>> after the original target checkpoint (T1 in your diagram) on may need to go 
>> to get to a checkpoint where all partitions of all topics are in the 
>> uncompacted region of their respective logs. In terms of your diagram: the 
>> T3 transaction could have been written 10's of minutes to hours after T1 as 
>> that was how much time it took all readers to get to T1.
>> 
>>> You would not have to start over from the beginning in order to read to T3.
>> 
>> While I agree this is technically true, in practice it could be very onerous 
>> to actually do it. For example, we use the Kafka consumer that is part of 
>> the Spark Streaming library to read table topics. It accepts a range of 
>> offsets to read for each partition. Say we originally target ranges from 
>> offset 0 to the offset of T1 for each topic+partition. There really is no 
>> way to have the library arrive at T1 an then "keep going" to T3. What is 
>> worse, given Spark's design, if you lost a worker during your calculations 
>> you would be in a rather sticky position. Spark achieves resiliency not by 
>> data redundancy but by keeping track of how to reproduce the transformations 
>> leading to a state. In the face of a lost worker, Spark would try to re-read 
>> that portion of the data on the lost worker from Kafka. However, in the 
>> interim compaction may have moved past the reproducible checkpoint (T3) 
>> rendering the data inconsistent. At best the entire calculation would need 
>> to start over targeting some later transaction checkpoint.
>> 
>> Needless to say with the proposed feature everything is quite simple. As 
>> long as we set the compaction lag large enough we can be assured that T1 
>> will remain in the uncompacted region an thereby be reproducible. Thus 
>> reading from 0 to the offsets in T1 will be sufficient for the duration of 
>> the calculation.
>> 
>> Eric
>> 
>> 



Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-17 Thread Gwen Shapira
 and Spark's implementation is another good reason to allow compaction lag.

I'm convinced :)

We need to decide:

1) Do we need just .ms config, or anything else? consumer lag is
measured (and monitored) in messages, so if we need this feature to
somehow work in tandem with consumer lag monitoring, I think we need
.messages too.

2) Does this new configuration allows us to get rid of cleaner.ratio config?

Gwen


On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
 wrote:
> James,
>
> Your pictures do an excellent job of illustrating my point.
>
> My mention of the additional "10's of minutes to hours" refers to how far 
> after the original target checkpoint (T1 in your diagram) on may need to go 
> to get to a checkpoint where all partitions of all topics are in the 
> uncompacted region of their respective logs. In terms of your diagram: the T3 
> transaction could have been written 10's of minutes to hours after T1 as that 
> was how much time it took all readers to get to T1.
>
>> You would not have to start over from the beginning in order to read to T3.
>
> While I agree this is technically true, in practice it could be very onerous 
> to actually do it. For example, we use the Kafka consumer that is part of the 
> Spark Streaming library to read table topics. It accepts a range of offsets 
> to read for each partition. Say we originally target ranges from offset 0 to 
> the offset of T1 for each topic+partition. There really is no way to have the 
> library arrive at T1 an then "keep going" to T3. What is worse, given Spark's 
> design, if you lost a worker during your calculations you would be in a 
> rather sticky position. Spark achieves resiliency not by data redundancy but 
> by keeping track of how to reproduce the transformations leading to a state. 
> In the face of a lost worker, Spark would try to re-read that portion of the 
> data on the lost worker from Kafka. However, in the interim compaction may 
> have moved past the reproducible checkpoint (T3) rendering the data 
> inconsistent. At best the entire calculation would need to start over 
> targeting some later transaction checkpoint.
>
> Needless to say with the proposed feature everything is quite simple. As long 
> as we set the compaction lag large enough we can be assured that T1 will 
> remain in the uncompacted region an thereby be reproducible. Thus reading 
> from 0 to the offsets in T1 will be sufficient for the duration of the 
> calculation.
>
> Eric
>
>


Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-17 Thread Eric Wasserman
James,

Your pictures do an excellent job of illustrating my point. 

My mention of the additional "10's of minutes to hours" refers to how far after 
the original target checkpoint (T1 in your diagram) on may need to go to get to 
a checkpoint where all partitions of all topics are in the uncompacted region 
of their respective logs. In terms of your diagram: the T3 transaction could 
have been written 10's of minutes to hours after T1 as that was how much time 
it took all readers to get to T1.

> You would not have to start over from the beginning in order to read to T3.

While I agree this is technically true, in practice it could be very onerous to 
actually do it. For example, we use the Kafka consumer that is part of the 
Spark Streaming library to read table topics. It accepts a range of offsets to 
read for each partition. Say we originally target ranges from offset 0 to the 
offset of T1 for each topic+partition. There really is no way to have the 
library arrive at T1 an then "keep going" to T3. What is worse, given Spark's 
design, if you lost a worker during your calculations you would be in a rather 
sticky position. Spark achieves resiliency not by data redundancy but by 
keeping track of how to reproduce the transformations leading to a state. In 
the face of a lost worker, Spark would try to re-read that portion of the data 
on the lost worker from Kafka. However, in the interim compaction may have 
moved past the reproducible checkpoint (T3) rendering the data inconsistent. At 
best the entire calculation would need to start over targeting some later 
transaction checkpoint.

Needless to say with the proposed feature everything is quite simple. As long 
as we set the compaction lag large enough we can be assured that T1 will remain 
in the uncompacted region an thereby be reproducible. Thus reading from 0 to 
the offsets in T1 will be sufficient for the duration of the calculation.

Eric




Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread James Cheng

> On May 16, 2016, at 9:21 PM, Eric Wasserman  wrote:
> 
> Gwen,
> 
> For simplicity, the example I gave in the gist is for a single table with a 
> single partition. The salient point is that even for a single topic with one 
> partition there is no guarantee without the feature that one will be able to 
> restore some particular checkpoint as the offset indicated by that checkpoint 
> may have been compacted away.
> 
> The practical reality is we are trying to restore the state of a database 
> with nearly 1000 tables each of which has 8 partitions. In this real case 
> there are 8000 offsets indicated in each checkpoint. If even a single one of 
> those 8000 is compacted the checkpointed state cannot be reconstructed.
> 

Eric, I believe you're talking about something like the following: 
http://imgur.com/3K4looF

In the picture, that shows 3 topic-partitions. The blue lines show where 
transactions T1 T2 and T3 fall amongst the 3 topic-partitions. So if your 
consumers read the 3 topic-partitions up to any of the 3 blue lines, they will 
receive a consistent snapshot across all 3 topics.

Now, look at the next image. http://imgur.com/5KuGWNv

This one shows what happens after compaction. The blacked out boxes show items 
that were compacted out. The red lines show the compaction point.

T1 can obviously not be constructed, because its offsets have been compacted 
out. At first glance, it looks like you would be able to read from the 
beginning up to T2 in order to construct a consistent snapshot, but that 
actually is not the case. The black box in the 2nd row has been compacted out, 
by let's say the 5th box in row 2. Reading up to T2 for the 2nd row means that 
you would be missing that value.

And so the only consistent snapshot you can reconstruct would be T3.

The general rule there is, you must read to the first transaction that occurs 
after the compaction points of across all 3 topic-partitions, in order to 
obtain a consistent snapshot.


> Additionally, we don't really intend to have the consumers of the table 
> topics try to keep current. Rather they will occasionally (say at 1AM each 
> day) try to build the state of the database at a recent checkpoint (say from 
> midnight). Supposing this takes a bit of time (10's of minutes to hours) to 
> read all the partitions of all the table topics up each to its target offset 
> indicated in the midnight checkpoint. By the time all the consumers have 
> arrive at the designated offset perhaps one of them will have had its target 
> offset compacted away. We would then need to select a new target checkpoint 
> with its offsets for each topic and partition that is a bit later. How much 
> later? It might well be around the 10's of minutes to hours it took to read 
> through to the offsets of the original target checkpoint as the compaction 
> that foiled us may have occurred just before we reached the goal.
> 

I'm not sure why you say you need an *additional* 10's of minutes to hours in 
order to reach the next target checkpoint. You have already read until the 
first checkpoint (which is no longer good enough). Wouldn't you just have to 
read an additional couple messages past that checkpoint in order to reach the 
next checkpoint? In my pictures, if you reached T2 and decided it was not good 
enough, you would simply have to read a couple more messages to get from T2 to 
T3. You would not have to start over from the beginning in order to read to T3.

-James

> Really the issue is that while without the feature while we could eventually 
> restore _some_ consistent state we couldn't be assured of being able to 
> restore any
> particular (recent) one. My comment about never being assured of the process 
> terminating is just acknowledging the perhaps small but nonetheless finite 
> possibility of the process of chasing the checkpoints looking for which no 
> partition has yet had its target offset compacted away could continue 
> indefinitely. There is really no condition in which one could be absolutely 
> guaranteed this process would terminate.
> 
> The feature addresses this by providing a guarantee that _any_ checkpoint can 
> be reconstructed as long as it is within the compaction lag. I would love to 
> be convinced that I am in error but short of that I frankly would never turn 
> on compaction for a CDC use case without it.
> 
> As to reducing the number of parameters. I personally only see the 
> min.compaction.lag.ms as being truly essential. Even the existing ratio 
> setting is secondary in my mind.
> 
> Eric
> 
>> On May 16, 2016, at 6:42 PM, Gwen Shapira  wrote:
>> 
>> Hi Eric,
>> 
>> Thank you for submitting this improvement suggestion.
>> 
>> Do you mind clarifying the use-case for me?
>> 
>> Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
>> 
>> If my consumer started reading all the CDC topics from the very
>> beginning in which they were created, without ever stopping, it is
>> obviously guarantee

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread James Cheng
We would find this KIP very useful. Our particular use case falls into the 
"application mistakes" portion of the KIP.

We are storing source of truth data in log compacted topics, similar to the 
Confluent Schema Registry. One situation we had recently was a misbehaving 
application. It sent data into its log-compacted source-of-truth topic, with 
the same keys but with corrupted message bodies. It took several days for us to 
notice this and by that time, log compaction had occurred. We lost data.

If we had been able to specify that log compaction would not occur for say 14 
days, this would give us 2 weeks to notice and recover from this situation. We 
could not have prevented the application from appending bad data after the good 
data, but at least the good data would still be present in the 
not-yet-compacted portion of the log, and we could write some manual recovery 
process to extract the good data out and do with it whatever we needed.

Because we do not have this facility, we currently have a rather complicated 
"backup" procedure whereby we use mirrormaker to mirror from our log compacted 
topic to a time-retention based topic. We have 14 day retention on the 
time-based topic, thus allowing us 14 days to recover from any data corruption 
on our log compacted topic. There is also a (imho) clever thing we do where we 
restart mirroring from the beginning of the log-compacted topic every 7 days. 
This allows us to make sure that even for very old non-changing keys, we always 
have a copy of them in the time-based topic.

Gwen helped us come up with that backup plan many months ago (Thanks Gwen!) but 
having a configurable "do not compact the last X days" setting would likely be 
enough to satisfy our backup needs.

-James

> On May 16, 2016, at 9:20 PM, Jay Kreps  wrote:
> 
> Yeah I think I gave a scenario but that is not the same as a concrete use
> case. I think the question you have is how common is it that people care
> about this and what concrete things would you build where you had this
> requirement? I think that would be good to figure out.
> 
> I think the issue with the current state is that it really gives no SLA at
> all, the last write to a segment is potentially compacted immediately so
> even a few seconds of lag (if your segment size is small) would cause this.
> 
> -Jay
> 
> On Mon, May 16, 2016 at 9:05 PM, Gwen Shapira  wrote:
> 
>> I agree that log.cleaner.min.compaction.lag.ms gives slightly more
>> flexibility for potentially-lagging consumers than tuning
>> segment.roll.ms for the exact same scenario.
>> 
>> If more people think that the use-case of "consumer which must see
>> every single record, is running on a compacted topic, and is lagging
>> enough that tuning segment.roll.ms won't help" is important enough
>> that we need to address, I won't object to proceeding with the KIP
>> (i.e. I'm probably -0 on this). It is easy to come up with a scenario
>> in which a feature is helpful (heck, I do it all the time), I'm just
>> not sure there is a real problem that cannot be addressed using
>> Kafka's existing behavior.
>> 
>> I do think that it will be an excellent idea to revisit the log
>> compaction configurations and see whether they make sense to users.
>> For example, if "log.cleaner.min.compaction.lag.X" can replace
>> "log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative,
>> I'll be more excited about the replacement, even without a strong
>> use-case for a specific compaction lag.
>> 
>> Gwen
>> 
>> On Mon, May 16, 2016 at 7:46 PM, Jay Kreps  wrote:
>>> I think it would be good to hammer out some of the practical use cases--I
>>> definitely share your disdain for adding more configs. Here is my sort of
>>> theoretical understanding of why you might want this.
>>> 
>>> As you say a consumer bootstrapping itself in the compacted part of the
>> log
>>> isn't actually traversing through valid states globally. i.e. if you have
>>> written the following:
>>>  offset, key, value
>>>  0, k0, v0
>>>  1, k1, v1
>>>  2, k0, v2
>>> it could be compacted to
>>>  1, k1, v1
>>>  2, k0, v2
>>> Thus at offset 1 in the compacted log, you would have applied k1, but not
>>> k0. So even though k0 and k1 both have valid values they get applied out
>> of
>>> order. This is totally normal, there is obviously no way to both compact
>>> and retain every valid state.
>>> 
>>> For many things this is a non-issue since they treat items only on a
>>> per-key basis without any global notion of consistency.
>>> 
>>> But let's say you want to guarantee you only traverse valid states in a
>>> caught-up real-time consumer, how can you do this? It's actually a bit
>>> tough. Generally speaking since we don't compact the active segment a
>>> real-time consumer should have this property but this doesn't really
>> give a
>>> hard SLA. With a small segment size and a lagging consumer you could
>>> imagine the compactor potentially getting ahead of the consumer.
>>> 
>>> So effectively what 

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Gwen Shapira
I see what you mean, Eric.

I was unclear on the specifics of your architecture. It sounds like
you have a table somewhere that maps checkpoints to lists of
.
In that case it is indeed useful to know that if the checkpoint was
written N ms ago, you will be able to find the exact offsets by
looking at the log.

Reading ahead won't really help in that case, since it sounds like the
state is too large to maintain in memory while reading ahead to a
future checkpoint.
(Different from Jay's abstract case in that regard).

Gwen


On Mon, May 16, 2016 at 9:21 PM, Eric Wasserman
 wrote:
> Gwen,
>
> For simplicity, the example I gave in the gist is for a single table with a 
> single partition. The salient point is that even for a single topic with one 
> partition there is no guarantee without the feature that one will be able to 
> restore some particular checkpoint as the offset indicated by that checkpoint 
> may have been compacted away.
>
> The practical reality is we are trying to restore the state of a database 
> with nearly 1000 tables each of which has 8 partitions. In this real case 
> there are 8000 offsets indicated in each checkpoint. If even a single one of 
> those 8000 is compacted the checkpointed state cannot be reconstructed.
>
> Additionally, we don't really intend to have the consumers of the table 
> topics try to keep current. Rather they will occasionally (say at 1AM each 
> day) try to build the state of the database at a recent checkpoint (say from 
> midnight). Supposing this takes a bit of time (10's of minutes to hours) to 
> read all the partitions of all the table topics up each to its target offset 
> indicated in the midnight checkpoint. By the time all the consumers have 
> arrive at the designated offset perhaps one of them will have had its target 
> offset compacted away. We would then need to select a new target checkpoint 
> with its offsets for each topic and partition that is a bit later. How much 
> later? It might well be around the 10's of minutes to hours it took to read 
> through to the offsets of the original target checkpoint as the compaction 
> that foiled us may have occurred just before we reached the goal.
>
> Really the issue is that while without the feature while we could eventually 
> restore _some_ consistent state we couldn't be assured of being able to 
> restore any
> particular (recent) one. My comment about never being assured of the process 
> terminating is just acknowledging the perhaps small but nonetheless finite 
> possibility of the process of chasing the checkpoints looking for which no 
> partition has yet had its target offset compacted away could continue 
> indefinitely. There is really no condition in which one could be absolutely 
> guaranteed this process would terminate.
>
> The feature addresses this by providing a guarantee that _any_ checkpoint can 
> be reconstructed as long as it is within the compaction lag. I would love to 
> be convinced that I am in error but short of that I frankly would never turn 
> on compaction for a CDC use case without it.
>
> As to reducing the number of parameters. I personally only see the 
> min.compaction.lag.ms as being truly essential. Even the existing ratio 
> setting is secondary in my mind.
>
> Eric
>
>> On May 16, 2016, at 6:42 PM, Gwen Shapira  wrote:
>>
>> Hi Eric,
>>
>> Thank you for submitting this improvement suggestion.
>>
>> Do you mind clarifying the use-case for me?
>>
>> Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
>>
>> If my consumer started reading all the CDC topics from the very
>> beginning in which they were created, without ever stopping, it is
>> obviously guaranteed to see every single consistent state of the
>> database.
>> If my consumer joined late (lets say after Tq got clobbered by Tr) it
>> will get a mixed state, but if it will continue listening on those
>> topics, always following the logs to their end, it is guaranteed to
>> see a consistent state as soon a new transaction commits. Am I missing
>> anything?
>>
>> Basically, I do not understand why you claim: "However, to recover all
>> the tables at the same checkpoint, with each independently compacting,
>> one may need to move to an even more recent checkpoint when a
>> different table had the same read issue with the new checkpoint. Thus
>> one could never be assured of this process terminating."
>>
>> I mean, it is true that you need to continuously read forward in order
>> to get to a consistent state, but why can't you be assured of getting
>> there?
>>
>> We are doing something very similar in KafkaConnect, where we need a
>> consistent view of our configuration. We make sure that if the current
>> state is inconsistent (i.e there is data that are not "committed"
>> yet), we continue reading to the log end until we get to a consistent
>> state.
>>
>> I am not convinced the new functionality is necessary, or even helpful.
>>
>> Gwen
>>
>> On Mon, May 16, 2016 at 4:07 PM, Er

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Eric Wasserman
Gwen,

For simplicity, the example I gave in the gist is for a single table with a 
single partition. The salient point is that even for a single topic with one 
partition there is no guarantee without the feature that one will be able to 
restore some particular checkpoint as the offset indicated by that checkpoint 
may have been compacted away.

The practical reality is we are trying to restore the state of a database with 
nearly 1000 tables each of which has 8 partitions. In this real case there are 
8000 offsets indicated in each checkpoint. If even a single one of those 8000 
is compacted the checkpointed state cannot be reconstructed.

Additionally, we don't really intend to have the consumers of the table topics 
try to keep current. Rather they will occasionally (say at 1AM each day) try to 
build the state of the database at a recent checkpoint (say from midnight). 
Supposing this takes a bit of time (10's of minutes to hours) to read all the 
partitions of all the table topics up each to its target offset indicated in 
the midnight checkpoint. By the time all the consumers have arrive at the 
designated offset perhaps one of them will have had its target offset compacted 
away. We would then need to select a new target checkpoint with its offsets for 
each topic and partition that is a bit later. How much later? It might well be 
around the 10's of minutes to hours it took to read through to the offsets of 
the original target checkpoint as the compaction that foiled us may have 
occurred just before we reached the goal.

Really the issue is that while without the feature while we could eventually 
restore _some_ consistent state we couldn't be assured of being able to restore 
any
particular (recent) one. My comment about never being assured of the process 
terminating is just acknowledging the perhaps small but nonetheless finite 
possibility of the process of chasing the checkpoints looking for which no 
partition has yet had its target offset compacted away could continue 
indefinitely. There is really no condition in which one could be absolutely 
guaranteed this process would terminate.

The feature addresses this by providing a guarantee that _any_ checkpoint can 
be reconstructed as long as it is within the compaction lag. I would love to be 
convinced that I am in error but short of that I frankly would never turn on 
compaction for a CDC use case without it.

As to reducing the number of parameters. I personally only see the 
min.compaction.lag.ms as being truly essential. Even the existing ratio setting 
is secondary in my mind.

Eric

> On May 16, 2016, at 6:42 PM, Gwen Shapira  wrote:
> 
> Hi Eric,
> 
> Thank you for submitting this improvement suggestion.
> 
> Do you mind clarifying the use-case for me?
> 
> Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
> 
> If my consumer started reading all the CDC topics from the very
> beginning in which they were created, without ever stopping, it is
> obviously guaranteed to see every single consistent state of the
> database.
> If my consumer joined late (lets say after Tq got clobbered by Tr) it
> will get a mixed state, but if it will continue listening on those
> topics, always following the logs to their end, it is guaranteed to
> see a consistent state as soon a new transaction commits. Am I missing
> anything?
> 
> Basically, I do not understand why you claim: "However, to recover all
> the tables at the same checkpoint, with each independently compacting,
> one may need to move to an even more recent checkpoint when a
> different table had the same read issue with the new checkpoint. Thus
> one could never be assured of this process terminating."
> 
> I mean, it is true that you need to continuously read forward in order
> to get to a consistent state, but why can't you be assured of getting
> there?
> 
> We are doing something very similar in KafkaConnect, where we need a
> consistent view of our configuration. We make sure that if the current
> state is inconsistent (i.e there is data that are not "committed"
> yet), we continue reading to the log end until we get to a consistent
> state.
> 
> I am not convinced the new functionality is necessary, or even helpful.
> 
> Gwen
> 
> On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman
>  wrote:
>> I would like to begin discussion on KIP-58
>> 
>> The KIP is here:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable
>> 
>> Jira: https://issues.apache.org/jira/browse/KAFKA-1981
>> 
>> Pull Request: https://github.com/apache/kafka/pull/1168
>> 
>> Thanks,
>> 
>> Eric



Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Jay Kreps
Yeah I think I gave a scenario but that is not the same as a concrete use
case. I think the question you have is how common is it that people care
about this and what concrete things would you build where you had this
requirement? I think that would be good to figure out.

I think the issue with the current state is that it really gives no SLA at
all, the last write to a segment is potentially compacted immediately so
even a few seconds of lag (if your segment size is small) would cause this.

-Jay

On Mon, May 16, 2016 at 9:05 PM, Gwen Shapira  wrote:

> I agree that log.cleaner.min.compaction.lag.ms gives slightly more
> flexibility for potentially-lagging consumers than tuning
> segment.roll.ms for the exact same scenario.
>
> If more people think that the use-case of "consumer which must see
> every single record, is running on a compacted topic, and is lagging
> enough that tuning segment.roll.ms won't help" is important enough
> that we need to address, I won't object to proceeding with the KIP
> (i.e. I'm probably -0 on this). It is easy to come up with a scenario
> in which a feature is helpful (heck, I do it all the time), I'm just
> not sure there is a real problem that cannot be addressed using
> Kafka's existing behavior.
>
> I do think that it will be an excellent idea to revisit the log
> compaction configurations and see whether they make sense to users.
> For example, if "log.cleaner.min.compaction.lag.X" can replace
> "log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative,
> I'll be more excited about the replacement, even without a strong
> use-case for a specific compaction lag.
>
> Gwen
>
> On Mon, May 16, 2016 at 7:46 PM, Jay Kreps  wrote:
> > I think it would be good to hammer out some of the practical use cases--I
> > definitely share your disdain for adding more configs. Here is my sort of
> > theoretical understanding of why you might want this.
> >
> > As you say a consumer bootstrapping itself in the compacted part of the
> log
> > isn't actually traversing through valid states globally. i.e. if you have
> > written the following:
> >   offset, key, value
> >   0, k0, v0
> >   1, k1, v1
> >   2, k0, v2
> > it could be compacted to
> >   1, k1, v1
> >   2, k0, v2
> > Thus at offset 1 in the compacted log, you would have applied k1, but not
> > k0. So even though k0 and k1 both have valid values they get applied out
> of
> > order. This is totally normal, there is obviously no way to both compact
> > and retain every valid state.
> >
> > For many things this is a non-issue since they treat items only on a
> > per-key basis without any global notion of consistency.
> >
> > But let's say you want to guarantee you only traverse valid states in a
> > caught-up real-time consumer, how can you do this? It's actually a bit
> > tough. Generally speaking since we don't compact the active segment a
> > real-time consumer should have this property but this doesn't really
> give a
> > hard SLA. With a small segment size and a lagging consumer you could
> > imagine the compactor potentially getting ahead of the consumer.
> >
> > So effectively what this config would establish is a guarantee that as
> long
> > as you consume all messages in log.cleaner.min.compaction.lag.ms you
> will
> > get every single produced record.
> >
> > -Jay
> >
> >
> >
> >
> >
> > On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira  wrote:
> >
> >> Hi Eric,
> >>
> >> Thank you for submitting this improvement suggestion.
> >>
> >> Do you mind clarifying the use-case for me?
> >>
> >> Looking at your gist:
> >> https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
> >>
> >> If my consumer started reading all the CDC topics from the very
> >> beginning in which they were created, without ever stopping, it is
> >> obviously guaranteed to see every single consistent state of the
> >> database.
> >> If my consumer joined late (lets say after Tq got clobbered by Tr) it
> >> will get a mixed state, but if it will continue listening on those
> >> topics, always following the logs to their end, it is guaranteed to
> >> see a consistent state as soon a new transaction commits. Am I missing
> >> anything?
> >>
> >> Basically, I do not understand why you claim: "However, to recover all
> >> the tables at the same checkpoint, with each independently compacting,
> >> one may need to move to an even more recent checkpoint when a
> >> different table had the same read issue with the new checkpoint. Thus
> >> one could never be assured of this process terminating."
> >>
> >> I mean, it is true that you need to continuously read forward in order
> >> to get to a consistent state, but why can't you be assured of getting
> >> there?
> >>
> >> We are doing something very similar in KafkaConnect, where we need a
> >> consistent view of our configuration. We make sure that if the current
> >> state is inconsistent (i.e there is data that are not "committed"
> >> yet), we continue reading to the log end until we get to a consistent
> 

Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Gwen Shapira
I agree that log.cleaner.min.compaction.lag.ms gives slightly more
flexibility for potentially-lagging consumers than tuning
segment.roll.ms for the exact same scenario.

If more people think that the use-case of "consumer which must see
every single record, is running on a compacted topic, and is lagging
enough that tuning segment.roll.ms won't help" is important enough
that we need to address, I won't object to proceeding with the KIP
(i.e. I'm probably -0 on this). It is easy to come up with a scenario
in which a feature is helpful (heck, I do it all the time), I'm just
not sure there is a real problem that cannot be addressed using
Kafka's existing behavior.

I do think that it will be an excellent idea to revisit the log
compaction configurations and see whether they make sense to users.
For example, if "log.cleaner.min.compaction.lag.X" can replace
"log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative,
I'll be more excited about the replacement, even without a strong
use-case for a specific compaction lag.

Gwen

On Mon, May 16, 2016 at 7:46 PM, Jay Kreps  wrote:
> I think it would be good to hammer out some of the practical use cases--I
> definitely share your disdain for adding more configs. Here is my sort of
> theoretical understanding of why you might want this.
>
> As you say a consumer bootstrapping itself in the compacted part of the log
> isn't actually traversing through valid states globally. i.e. if you have
> written the following:
>   offset, key, value
>   0, k0, v0
>   1, k1, v1
>   2, k0, v2
> it could be compacted to
>   1, k1, v1
>   2, k0, v2
> Thus at offset 1 in the compacted log, you would have applied k1, but not
> k0. So even though k0 and k1 both have valid values they get applied out of
> order. This is totally normal, there is obviously no way to both compact
> and retain every valid state.
>
> For many things this is a non-issue since they treat items only on a
> per-key basis without any global notion of consistency.
>
> But let's say you want to guarantee you only traverse valid states in a
> caught-up real-time consumer, how can you do this? It's actually a bit
> tough. Generally speaking since we don't compact the active segment a
> real-time consumer should have this property but this doesn't really give a
> hard SLA. With a small segment size and a lagging consumer you could
> imagine the compactor potentially getting ahead of the consumer.
>
> So effectively what this config would establish is a guarantee that as long
> as you consume all messages in log.cleaner.min.compaction.lag.ms you will
> get every single produced record.
>
> -Jay
>
>
>
>
>
> On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira  wrote:
>
>> Hi Eric,
>>
>> Thank you for submitting this improvement suggestion.
>>
>> Do you mind clarifying the use-case for me?
>>
>> Looking at your gist:
>> https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
>>
>> If my consumer started reading all the CDC topics from the very
>> beginning in which they were created, without ever stopping, it is
>> obviously guaranteed to see every single consistent state of the
>> database.
>> If my consumer joined late (lets say after Tq got clobbered by Tr) it
>> will get a mixed state, but if it will continue listening on those
>> topics, always following the logs to their end, it is guaranteed to
>> see a consistent state as soon a new transaction commits. Am I missing
>> anything?
>>
>> Basically, I do not understand why you claim: "However, to recover all
>> the tables at the same checkpoint, with each independently compacting,
>> one may need to move to an even more recent checkpoint when a
>> different table had the same read issue with the new checkpoint. Thus
>> one could never be assured of this process terminating."
>>
>> I mean, it is true that you need to continuously read forward in order
>> to get to a consistent state, but why can't you be assured of getting
>> there?
>>
>> We are doing something very similar in KafkaConnect, where we need a
>> consistent view of our configuration. We make sure that if the current
>> state is inconsistent (i.e there is data that are not "committed"
>> yet), we continue reading to the log end until we get to a consistent
>> state.
>>
>> I am not convinced the new functionality is necessary, or even helpful.
>>
>> Gwen
>>
>> On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman
>>  wrote:
>> > I would like to begin discussion on KIP-58
>> >
>> > The KIP is here:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable
>> >
>> > Jira: https://issues.apache.org/jira/browse/KAFKA-1981
>> >
>> > Pull Request: https://github.com/apache/kafka/pull/1168
>> >
>> > Thanks,
>> >
>> > Eric
>>


Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Jay Kreps
I think it would be good to hammer out some of the practical use cases--I
definitely share your disdain for adding more configs. Here is my sort of
theoretical understanding of why you might want this.

As you say a consumer bootstrapping itself in the compacted part of the log
isn't actually traversing through valid states globally. i.e. if you have
written the following:
  offset, key, value
  0, k0, v0
  1, k1, v1
  2, k0, v2
it could be compacted to
  1, k1, v1
  2, k0, v2
Thus at offset 1 in the compacted log, you would have applied k1, but not
k0. So even though k0 and k1 both have valid values they get applied out of
order. This is totally normal, there is obviously no way to both compact
and retain every valid state.

For many things this is a non-issue since they treat items only on a
per-key basis without any global notion of consistency.

But let's say you want to guarantee you only traverse valid states in a
caught-up real-time consumer, how can you do this? It's actually a bit
tough. Generally speaking since we don't compact the active segment a
real-time consumer should have this property but this doesn't really give a
hard SLA. With a small segment size and a lagging consumer you could
imagine the compactor potentially getting ahead of the consumer.

So effectively what this config would establish is a guarantee that as long
as you consume all messages in log.cleaner.min.compaction.lag.ms you will
get every single produced record.

-Jay





On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira  wrote:

> Hi Eric,
>
> Thank you for submitting this improvement suggestion.
>
> Do you mind clarifying the use-case for me?
>
> Looking at your gist:
> https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46
>
> If my consumer started reading all the CDC topics from the very
> beginning in which they were created, without ever stopping, it is
> obviously guaranteed to see every single consistent state of the
> database.
> If my consumer joined late (lets say after Tq got clobbered by Tr) it
> will get a mixed state, but if it will continue listening on those
> topics, always following the logs to their end, it is guaranteed to
> see a consistent state as soon a new transaction commits. Am I missing
> anything?
>
> Basically, I do not understand why you claim: "However, to recover all
> the tables at the same checkpoint, with each independently compacting,
> one may need to move to an even more recent checkpoint when a
> different table had the same read issue with the new checkpoint. Thus
> one could never be assured of this process terminating."
>
> I mean, it is true that you need to continuously read forward in order
> to get to a consistent state, but why can't you be assured of getting
> there?
>
> We are doing something very similar in KafkaConnect, where we need a
> consistent view of our configuration. We make sure that if the current
> state is inconsistent (i.e there is data that are not "committed"
> yet), we continue reading to the log end until we get to a consistent
> state.
>
> I am not convinced the new functionality is necessary, or even helpful.
>
> Gwen
>
> On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman
>  wrote:
> > I would like to begin discussion on KIP-58
> >
> > The KIP is here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable
> >
> > Jira: https://issues.apache.org/jira/browse/KAFKA-1981
> >
> > Pull Request: https://github.com/apache/kafka/pull/1168
> >
> > Thanks,
> >
> > Eric
>


Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable

2016-05-16 Thread Gwen Shapira
Hi Eric,

Thank you for submitting this improvement suggestion.

Do you mind clarifying the use-case for me?

Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46

If my consumer started reading all the CDC topics from the very
beginning in which they were created, without ever stopping, it is
obviously guaranteed to see every single consistent state of the
database.
If my consumer joined late (lets say after Tq got clobbered by Tr) it
will get a mixed state, but if it will continue listening on those
topics, always following the logs to their end, it is guaranteed to
see a consistent state as soon a new transaction commits. Am I missing
anything?

Basically, I do not understand why you claim: "However, to recover all
the tables at the same checkpoint, with each independently compacting,
one may need to move to an even more recent checkpoint when a
different table had the same read issue with the new checkpoint. Thus
one could never be assured of this process terminating."

I mean, it is true that you need to continuously read forward in order
to get to a consistent state, but why can't you be assured of getting
there?

We are doing something very similar in KafkaConnect, where we need a
consistent view of our configuration. We make sure that if the current
state is inconsistent (i.e there is data that are not "committed"
yet), we continue reading to the log end until we get to a consistent
state.

I am not convinced the new functionality is necessary, or even helpful.

Gwen

On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman
 wrote:
> I would like to begin discussion on KIP-58
>
> The KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable
>
> Jira: https://issues.apache.org/jira/browse/KAFKA-1981
>
> Pull Request: https://github.com/apache/kafka/pull/1168
>
> Thanks,
>
> Eric