Hi,

>From our perspective (running thousands of Kafka clusters), the main issues
we see with compacted topics *aren't* disk space usage, or IO utilization
of the log cleaner.

Size matters a *lot* to the usability of consumers bootstrapping from the
beginning - in fact we've been debating tuning out the log segment size for
compacted topics to 100MB, because right now leaving 1GB of uncompacted log
makes some bootstrapping take way too long (especially for non JVM clients,
even in fast languages like Go they're not as capable of high throughput as
the JVM clients). I'm wondering if that should be a default in Kafka itself
as well, and would be happy to contribute that kind of change upstream.
Kafka already tunes the __consumer_offsets topic down to 100MB per segment
for this exact reason.

Secondly, the docs don't make it clear (and this has confused dozens of
well intentioned, smart folk that we've talked to, and likely thousands of
Kafka users across the board) that compaction is an *alternative* to time
based retention. Lots of folk used compaction assuming "it's like time
based retention, but with even less space usage". Switching between the two
is thankfully easy, but it's been a very confusing thing to understand. I'd
like to contribute back clearer docs to Kafka about this. Should I send a
PR? Would that be welcome?

Thirdly, most users *don't* want to tune Kafka's settings at all, or even
know how or when they should. Whilst some amount of tuning is inevitable,
the drive Gwen has towards "less tuning" is very positive from our
perspective. Most users of most software (including technical users of data
storage and messaging systems) want to "just use it" and not worry about
"do I need to monitor a thousand things and then tune another thousand
based on my metrics". Whilst some of that is unavoidable (for sure), it
feels like compaction tuning should be something the project provides
*great* general purpose defaults for most users, which cover most of the
cases, which leave tuning just to the 1% of folk who really really care.
The current defaults seem to be doing well here (barring the above note
about log compaction size), and any future changes here should keep this up.

Thanks

Tom Crayford
Heroku Kafka

On Fri, May 20, 2016 at 4:48 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Gwen,
>
> Yeah specifying in bytes versus the utilization percent would have been
> easier to implement. The argument against that is that basically users are
> super terrible at predicting and updating data sizes as stuff grows and
> you'd have to really set this then for each individual log perhaps?
> Currently I think that the utilization number of 50% is pretty reasonable
> for most people and you only need to tune it if you really want to
> optimize. But if you set a fixed size compaction threshold in bytes then
> how aggressive this is and the resulting utilization totally depends on the
> compacted size of the data in the topic. i.e. if it defaults to 20GB then
> that becomes the minimum size of the log, so if you end up with a bunch of
> topics with 100mb of compacted data they all end up growing to 20GB. As a
> user if you think you've written 100*100mb worth of compacted partitions
> but Kafka has 100*20GB of data I think you'd be a bit shocked.
>
> Ben--I think your proposal attempts to minimize total I/O by waiting until
> the compaction buffer will be maxed out. Each unique key in the uncompacted
> log uses 24 bytes of compaction buffer iirc but since you don't know the
> number of unique keys it's a bit hard to guess this. You could assume they
> are all unique and only compact when you have N/24 messages in the
> uncompacted log where N is the compaction buffer size in bytes. The issue
> as with Gwen's proposal is that by doing this you really lose control of
> disk utilization which might be a bit unintuitive. Your idea of just using
> the free disk space might fix this though it might be somewhat complex in
> the mixed setting with both compacted and non-compacted topics.
>
> One other thing worth noting is that compaction isn't just for disk space.
> A consumer that bootstraps from the beginning (a la state restore in Kafka
> Streams) has to fully read and process the whole log so I think you want to
> compact even when you still have free space.
>
> -Jay
>
>
>
> On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira <g...@confluent.io> wrote:
>
> > Oops :)
> >
> > The docs are definitely not doing the feature any favors, but I didn't
> mean
> > to imply the feature is thoughtless.
> >
> > Here's the thing I'm not getting: You are trading off disk space for IO
> > efficiency. Thats reasonable. But why not allow users to specify space in
> > bytes?
> >
> > Basically tell the LogCompacter: Once I have X bytes of dirty data (or
> post
> > KIP-58, X bytes of data that needs cleaning), please compact it to the
> best
> > of your ability (which in steady state will be into almost nothing).
> >
> > Since we know how big the compaction buffer is and how Kafka uses it, we
> > can exactly calculate how much space we are wasting vs. how much IO we
> are
> > going to do per unit of time. The size of a single segment or compaction
> > buffer (whichever is bigger) can be a good default value for
> > min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> > amount of free space on the disk. Heck, we can automate those tunings
> > (lower min.dirty.bytes to trigger compaction and free space if we are
> close
> > to running out of space).
> >
> > We can do the same capacity planning with percentages but it requires
> more
> > information to know the results, information that can only be acquired
> > after you reach steady state.
> >
> > It is a bit obvious, so I'm guessing the idea was considered and
> dismissed.
> > I just can't see why.
> > If only there were KIPs back then, so I could look at rejected
> > alternatives...
> >
> > Gwen
> >
> >
> >
> > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > So in summary we never considered this a mechanism to give the consumer
> > > time to consume prior to compaction, just a mechanism to control space
> > > wastage. It sort of accidentally gives you that but it's super hard to
> > > reason about it as an SLA since it is relative to the log size rather
> > than
> > > absolute.
> > >
> > > -Jay
> > >
> > > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > The sad part is I actually did think pretty hard about how to
> configure
> > > > that stuff so I guess *I* think the config makes sense! Clearly
> trying
> > to
> > > > prevent my being shot :-)
> > > >
> > > > I agree the name could be improved and the documentation is quite
> > > > spartan--no guidance at all on how to set it or what it trades off. A
> > bit
> > > > shameful.
> > > >
> > > > The thinking was this. One approach to cleaning would be to just do
> it
> > > > continually with the idea that, hey, you can't take that I/O with
> > > you--once
> > > > you've budgeted N MB/sec of background I/O for compaction some of the
> > > time,
> > > > you might as well just use that budget all the time. But this leads
> to
> > > > seemingly silly behavior where you are doing big ass compactions all
> > the
> > > > time to free up just a few bytes and we thought it would freak people
> > > out.
> > > > Plus arguably Kafka usage isn't all in steady state so this wastage
> > would
> > > > come out of the budget for other bursty stuff.
> > > >
> > > >  So when should compaction kick in? Well what are you trading off?
> The
> > > > tradeoff here is how much space to waste on disk versus how much I/O
> to
> > > use
> > > > in cleaning. In general we can't say exactly how much space a
> > compaction
> > > > will free up--during a phase of all "inserts" compaction may free up
> no
> > > > space at all. You just have to do the compaction and hope for the
> best.
> > > But
> > > > in general for most compacted topics they should soon reach a "steady
> > > > state" where they aren't growing or growing very slowly, so most
> writes
> > > are
> > > > updates (if they keep growing rapidly indefinitely then you are going
> > to
> > > > run out of space--so safe to assume they do reach steady state). In
> > this
> > > > steady state the ratio of uncompacted log to total log is effectively
> > the
> > > > utilization (wasted space percentage). So if you set it to 50% your
> > data
> > > is
> > > > about half duplicates. By tolerating more uncleaned log you get more
> > bang
> > > > for your compaction I/O buck but more space wastage. This seemed
> like a
> > > > reasonable way to think about it because maybe you know your
> compacted
> > > data
> > > > size (roughly) so you can reason about whether using, say, twice that
> > > space
> > > > is okay.
> > > >
> > > > Maybe we should just change the name to something about target
> > > utilization
> > > > even though that isn't strictly true except in steady state?
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io>
> > wrote:
> > > >
> > > >> Interesting!
> > > >>
> > > >> This needs to be double checked by someone with more experience, but
> > > >> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
> > > >> controls *just* the second property, and I'm not even convinced
> about
> > > >> that.
> > > >>
> > > >> Few facts:
> > > >>
> > > >> 1. Each cleaner thread cleans one log at a time. It always goes for
> > > >> the log with the largest percentage of non-compacted bytes. If you
> > > >> just created a new partition, wrote 1G and switched to a new
> segment,
> > > >> it is very likely that this will be the next log to compact.
> > > >> Explaining the behavior Eric and Jay complained about. I expected it
> > > >> to be rare.
> > > >>
> > > >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
> > > >> min.cleanable is), it will be skipped, knowing that others have even
> > > >> lower ditry ratio.
> > > >>
> > > >> 3. If we do decide to clean a log, we will clean the whole damn
> thing,
> > > >> leaving only the active segment. Contrary to my expectations, it
> does
> > > >> not leave any dirty byte behind. So *at most* you will have a single
> > > >> clean segment. Again, explaining why Jay, James and Eric are
> unhappy.
> > > >>
> > > >> 4. What is does guarantee (kinda? at least I think it tries?) is to
> > > >> always clean a large "chunk" of data at once, hopefully minimizing
> > > >> churn (cleaning small bits off the same log over and over) and
> > > >> minimizing IO. It does have the nice mathematical property of
> > > >> guaranteeing double the amount of time between cleanings (except it
> > > >> doesn't really, because who knows the size of the compacted region).
> > > >>
> > > >> 5. Whoever wrote the docs should be shot :)
> > > >>
> > > >> so, in conclusion:
> > > >> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
> > > >> difficult to understand, and IMO doesn't even do what it should do.
> > > >> I would like to consider the possibility of
> > > >> min.cleanable.dirty.bytes, which should give good control over # of
> IO
> > > >> operations (since the size of compaction buffer is known).
> > > >>
> > > >> In the context of this KIP, the interaction with cleanable ratio and
> > > >> cleanable bytes will be similar, and it looks like it was already
> done
> > > >> correctly in the PR, so no worries ("the ratio's definition will be
> > > >> expanded to become the ratio of "compactable" to compactable plus
> > > >> compacted message sizes. Where compactable includes log segments
> that
> > > >> are neither the active segment nor those prohibited from being
> > > >> compacted because they contain messages that do not satisfy all the
> > > >> new lag constraints"
> > > >>
> > > >> I may open a new KIP to handle the cleanable ratio. Please don't let
> > > >> my confusion detract from this KIP.
> > > >>
> > > >> Gwen
> > > >>
> > > >> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io>
> > wrote:
> > > >> > Generally, this seems like a sensible proposal to me.
> > > >> >
> > > >> > Regarding (1): time and message count seem sensible. I can’t think
> > of
> > > a
> > > >> specific use case for bytes but it seems like there could be one.
> > > >> >
> > > >> > Regarding (2):
> > > >> > The setting log.cleaner.min.cleanable.ratio currently seems to
> have
> > > two
> > > >> uses. It controls which messages will not be compacted, but it also
> > > >> provides a fractional bound on how many logs are cleaned (and hence
> > work
> > > >> done) in each round. This new proposal seems aimed at the first use,
> > but
> > > >> not the second.
> > > >> >
> > > >> > The second case better suits a fractional setting like the one we
> > have
> > > >> now. Using a fractional value means the amount of data cleaned
> scales
> > in
> > > >> proportion to the data stored in the log. If we were to replace this
> > > with
> > > >> an absolute value it would create proportionally more cleaning work
> as
> > > the
> > > >> log grew in size.
> > > >> >
> > > >> > So, if I understand this correctly, I think there is an argument
> for
> > > >> having both.
> > > >> >
> > > >> >
> > > >> >> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io>
> wrote:
> > > >> >>
> > > >> >> .... and Spark's implementation is another good reason to allow
> > > >> compaction lag.
> > > >> >>
> > > >> >> I'm convinced :)
> > > >> >>
> > > >> >> We need to decide:
> > > >> >>
> > > >> >> 1) Do we need just .ms config, or anything else? consumer lag is
> > > >> >> measured (and monitored) in messages, so if we need this feature
> to
> > > >> >> somehow work in tandem with consumer lag monitoring, I think we
> > need
> > > >> >> .messages too.
> > > >> >>
> > > >> >> 2) Does this new configuration allows us to get rid of
> > cleaner.ratio
> > > >> config?
> > > >> >>
> > > >> >> Gwen
> > > >> >>
> > > >> >>
> > > >> >> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
> > > >> >> <eric.wasser...@gmail.com> wrote:
> > > >> >>> James,
> > > >> >>>
> > > >> >>> Your pictures do an excellent job of illustrating my point.
> > > >> >>>
> > > >> >>> My mention of the additional "10's of minutes to hours" refers
> to
> > > how
> > > >> far after the original target checkpoint (T1 in your diagram) on may
> > > need
> > > >> to go to get to a checkpoint where all partitions of all topics are
> in
> > > the
> > > >> uncompacted region of their respective logs. In terms of your
> diagram:
> > > the
> > > >> T3 transaction could have been written 10's of minutes to hours
> after
> > > T1 as
> > > >> that was how much time it took all readers to get to T1.
> > > >> >>>
> > > >> >>>> You would not have to start over from the beginning in order to
> > > read
> > > >> to T3.
> > > >> >>>
> > > >> >>> While I agree this is technically true, in practice it could be
> > very
> > > >> onerous to actually do it. For example, we use the Kafka consumer
> that
> > > is
> > > >> part of the Spark Streaming library to read table topics. It
> accepts a
> > > >> range of offsets to read for each partition. Say we originally
> target
> > > >> ranges from offset 0 to the offset of T1 for each topic+partition.
> > There
> > > >> really is no way to have the library arrive at T1 an then "keep
> going"
> > > to
> > > >> T3. What is worse, given Spark's design, if you lost a worker during
> > > your
> > > >> calculations you would be in a rather sticky position. Spark
> achieves
> > > >> resiliency not by data redundancy but by keeping track of how to
> > > reproduce
> > > >> the transformations leading to a state. In the face of a lost
> worker,
> > > Spark
> > > >> would try to re-read that portion of the data on the lost worker
> from
> > > >> Kafka. However, in the interim compaction may have moved past the
> > > >> reproducible checkpoint (T3) rendering the data inconsistent. At
> best
> > > the
> > > >> entire calculation would need to start over targeting some later
> > > >> transaction checkpoint.
> > > >> >>>
> > > >> >>> Needless to say with the proposed feature everything is quite
> > > simple.
> > > >> As long as we set the compaction lag large enough we can be assured
> > > that T1
> > > >> will remain in the uncompacted region an thereby be reproducible.
> Thus
> > > >> reading from 0 to the offsets in T1 will be sufficient for the
> > duration
> > > of
> > > >> the calculation.
> > > >> >>>
> > > >> >>> Eric
> > > >> >>>
> > > >> >>>
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to