Tom,

Documentation improvements are always welcome. The docs are in /docs under
the main repository, just sent a PR for trunk and we are good :)

Segment sizes - I have some objections, but this can be discussed in its
own thread. I feel like I did enough hijacking and Eric may get annoyed at
some point.

Gwen

On Fri, May 20, 2016 at 5:19 AM, Tom Crayford <tcrayf...@heroku.com> wrote:

> Hi,
>
> From our perspective (running thousands of Kafka clusters), the main issues
> we see with compacted topics *aren't* disk space usage, or IO utilization
> of the log cleaner.
>
> Size matters a *lot* to the usability of consumers bootstrapping from the
> beginning - in fact we've been debating tuning out the log segment size for
> compacted topics to 100MB, because right now leaving 1GB of uncompacted log
> makes some bootstrapping take way too long (especially for non JVM clients,
> even in fast languages like Go they're not as capable of high throughput as
> the JVM clients). I'm wondering if that should be a default in Kafka itself
> as well, and would be happy to contribute that kind of change upstream.
> Kafka already tunes the __consumer_offsets topic down to 100MB per segment
> for this exact reason.
>
> Secondly, the docs don't make it clear (and this has confused dozens of
> well intentioned, smart folk that we've talked to, and likely thousands of
> Kafka users across the board) that compaction is an *alternative* to time
> based retention. Lots of folk used compaction assuming "it's like time
> based retention, but with even less space usage". Switching between the two
> is thankfully easy, but it's been a very confusing thing to understand. I'd
> like to contribute back clearer docs to Kafka about this. Should I send a
> PR? Would that be welcome?
>
> Thirdly, most users *don't* want to tune Kafka's settings at all, or even
> know how or when they should. Whilst some amount of tuning is inevitable,
> the drive Gwen has towards "less tuning" is very positive from our
> perspective. Most users of most software (including technical users of data
> storage and messaging systems) want to "just use it" and not worry about
> "do I need to monitor a thousand things and then tune another thousand
> based on my metrics". Whilst some of that is unavoidable (for sure), it
> feels like compaction tuning should be something the project provides
> *great* general purpose defaults for most users, which cover most of the
> cases, which leave tuning just to the 1% of folk who really really care.
> The current defaults seem to be doing well here (barring the above note
> about log compaction size), and any future changes here should keep this
> up.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Fri, May 20, 2016 at 4:48 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Gwen,
> >
> > Yeah specifying in bytes versus the utilization percent would have been
> > easier to implement. The argument against that is that basically users
> are
> > super terrible at predicting and updating data sizes as stuff grows and
> > you'd have to really set this then for each individual log perhaps?
> > Currently I think that the utilization number of 50% is pretty reasonable
> > for most people and you only need to tune it if you really want to
> > optimize. But if you set a fixed size compaction threshold in bytes then
> > how aggressive this is and the resulting utilization totally depends on
> the
> > compacted size of the data in the topic. i.e. if it defaults to 20GB then
> > that becomes the minimum size of the log, so if you end up with a bunch
> of
> > topics with 100mb of compacted data they all end up growing to 20GB. As a
> > user if you think you've written 100*100mb worth of compacted partitions
> > but Kafka has 100*20GB of data I think you'd be a bit shocked.
> >
> > Ben--I think your proposal attempts to minimize total I/O by waiting
> until
> > the compaction buffer will be maxed out. Each unique key in the
> uncompacted
> > log uses 24 bytes of compaction buffer iirc but since you don't know the
> > number of unique keys it's a bit hard to guess this. You could assume
> they
> > are all unique and only compact when you have N/24 messages in the
> > uncompacted log where N is the compaction buffer size in bytes. The issue
> > as with Gwen's proposal is that by doing this you really lose control of
> > disk utilization which might be a bit unintuitive. Your idea of just
> using
> > the free disk space might fix this though it might be somewhat complex in
> > the mixed setting with both compacted and non-compacted topics.
> >
> > One other thing worth noting is that compaction isn't just for disk
> space.
> > A consumer that bootstraps from the beginning (a la state restore in
> Kafka
> > Streams) has to fully read and process the whole log so I think you want
> to
> > compact even when you still have free space.
> >
> > -Jay
> >
> >
> >
> > On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira <g...@confluent.io>
> wrote:
> >
> > > Oops :)
> > >
> > > The docs are definitely not doing the feature any favors, but I didn't
> > mean
> > > to imply the feature is thoughtless.
> > >
> > > Here's the thing I'm not getting: You are trading off disk space for IO
> > > efficiency. Thats reasonable. But why not allow users to specify space
> in
> > > bytes?
> > >
> > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or
> > post
> > > KIP-58, X bytes of data that needs cleaning), please compact it to the
> > best
> > > of your ability (which in steady state will be into almost nothing).
> > >
> > > Since we know how big the compaction buffer is and how Kafka uses it,
> we
> > > can exactly calculate how much space we are wasting vs. how much IO we
> > are
> > > going to do per unit of time. The size of a single segment or
> compaction
> > > buffer (whichever is bigger) can be a good default value for
> > > min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> > > amount of free space on the disk. Heck, we can automate those tunings
> > > (lower min.dirty.bytes to trigger compaction and free space if we are
> > close
> > > to running out of space).
> > >
> > > We can do the same capacity planning with percentages but it requires
> > more
> > > information to know the results, information that can only be acquired
> > > after you reach steady state.
> > >
> > > It is a bit obvious, so I'm guessing the idea was considered and
> > dismissed.
> > > I just can't see why.
> > > If only there were KIPs back then, so I could look at rejected
> > > alternatives...
> > >
> > > Gwen
> > >
> > >
> > >
> > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > So in summary we never considered this a mechanism to give the
> consumer
> > > > time to consume prior to compaction, just a mechanism to control
> space
> > > > wastage. It sort of accidentally gives you that but it's super hard
> to
> > > > reason about it as an SLA since it is relative to the log size rather
> > > than
> > > > absolute.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > The sad part is I actually did think pretty hard about how to
> > configure
> > > > > that stuff so I guess *I* think the config makes sense! Clearly
> > trying
> > > to
> > > > > prevent my being shot :-)
> > > > >
> > > > > I agree the name could be improved and the documentation is quite
> > > > > spartan--no guidance at all on how to set it or what it trades
> off. A
> > > bit
> > > > > shameful.
> > > > >
> > > > > The thinking was this. One approach to cleaning would be to just do
> > it
> > > > > continually with the idea that, hey, you can't take that I/O with
> > > > you--once
> > > > > you've budgeted N MB/sec of background I/O for compaction some of
> the
> > > > time,
> > > > > you might as well just use that budget all the time. But this leads
> > to
> > > > > seemingly silly behavior where you are doing big ass compactions
> all
> > > the
> > > > > time to free up just a few bytes and we thought it would freak
> people
> > > > out.
> > > > > Plus arguably Kafka usage isn't all in steady state so this wastage
> > > would
> > > > > come out of the budget for other bursty stuff.
> > > > >
> > > > >  So when should compaction kick in? Well what are you trading off?
> > The
> > > > > tradeoff here is how much space to waste on disk versus how much
> I/O
> > to
> > > > use
> > > > > in cleaning. In general we can't say exactly how much space a
> > > compaction
> > > > > will free up--during a phase of all "inserts" compaction may free
> up
> > no
> > > > > space at all. You just have to do the compaction and hope for the
> > best.
> > > > But
> > > > > in general for most compacted topics they should soon reach a
> "steady
> > > > > state" where they aren't growing or growing very slowly, so most
> > writes
> > > > are
> > > > > updates (if they keep growing rapidly indefinitely then you are
> going
> > > to
> > > > > run out of space--so safe to assume they do reach steady state). In
> > > this
> > > > > steady state the ratio of uncompacted log to total log is
> effectively
> > > the
> > > > > utilization (wasted space percentage). So if you set it to 50% your
> > > data
> > > > is
> > > > > about half duplicates. By tolerating more uncleaned log you get
> more
> > > bang
> > > > > for your compaction I/O buck but more space wastage. This seemed
> > like a
> > > > > reasonable way to think about it because maybe you know your
> > compacted
> > > > data
> > > > > size (roughly) so you can reason about whether using, say, twice
> that
> > > > space
> > > > > is okay.
> > > > >
> > > > > Maybe we should just change the name to something about target
> > > > utilization
> > > > > even though that isn't strictly true except in steady state?
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io>
> > > wrote:
> > > > >
> > > > >> Interesting!
> > > > >>
> > > > >> This needs to be double checked by someone with more experience,
> but
> > > > >> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
> > > > >> controls *just* the second property, and I'm not even convinced
> > about
> > > > >> that.
> > > > >>
> > > > >> Few facts:
> > > > >>
> > > > >> 1. Each cleaner thread cleans one log at a time. It always goes
> for
> > > > >> the log with the largest percentage of non-compacted bytes. If you
> > > > >> just created a new partition, wrote 1G and switched to a new
> > segment,
> > > > >> it is very likely that this will be the next log to compact.
> > > > >> Explaining the behavior Eric and Jay complained about. I expected
> it
> > > > >> to be rare.
> > > > >>
> > > > >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
> > > > >> min.cleanable is), it will be skipped, knowing that others have
> even
> > > > >> lower ditry ratio.
> > > > >>
> > > > >> 3. If we do decide to clean a log, we will clean the whole damn
> > thing,
> > > > >> leaving only the active segment. Contrary to my expectations, it
> > does
> > > > >> not leave any dirty byte behind. So *at most* you will have a
> single
> > > > >> clean segment. Again, explaining why Jay, James and Eric are
> > unhappy.
> > > > >>
> > > > >> 4. What is does guarantee (kinda? at least I think it tries?) is
> to
> > > > >> always clean a large "chunk" of data at once, hopefully minimizing
> > > > >> churn (cleaning small bits off the same log over and over) and
> > > > >> minimizing IO. It does have the nice mathematical property of
> > > > >> guaranteeing double the amount of time between cleanings (except
> it
> > > > >> doesn't really, because who knows the size of the compacted
> region).
> > > > >>
> > > > >> 5. Whoever wrote the docs should be shot :)
> > > > >>
> > > > >> so, in conclusion:
> > > > >> In my mind, min.cleanable.dirty.ratio is terrible, it is
> misleading,
> > > > >> difficult to understand, and IMO doesn't even do what it should
> do.
> > > > >> I would like to consider the possibility of
> > > > >> min.cleanable.dirty.bytes, which should give good control over #
> of
> > IO
> > > > >> operations (since the size of compaction buffer is known).
> > > > >>
> > > > >> In the context of this KIP, the interaction with cleanable ratio
> and
> > > > >> cleanable bytes will be similar, and it looks like it was already
> > done
> > > > >> correctly in the PR, so no worries ("the ratio's definition will
> be
> > > > >> expanded to become the ratio of "compactable" to compactable plus
> > > > >> compacted message sizes. Where compactable includes log segments
> > that
> > > > >> are neither the active segment nor those prohibited from being
> > > > >> compacted because they contain messages that do not satisfy all
> the
> > > > >> new lag constraints"
> > > > >>
> > > > >> I may open a new KIP to handle the cleanable ratio. Please don't
> let
> > > > >> my confusion detract from this KIP.
> > > > >>
> > > > >> Gwen
> > > > >>
> > > > >> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io>
> > > wrote:
> > > > >> > Generally, this seems like a sensible proposal to me.
> > > > >> >
> > > > >> > Regarding (1): time and message count seem sensible. I can’t
> think
> > > of
> > > > a
> > > > >> specific use case for bytes but it seems like there could be one.
> > > > >> >
> > > > >> > Regarding (2):
> > > > >> > The setting log.cleaner.min.cleanable.ratio currently seems to
> > have
> > > > two
> > > > >> uses. It controls which messages will not be compacted, but it
> also
> > > > >> provides a fractional bound on how many logs are cleaned (and
> hence
> > > work
> > > > >> done) in each round. This new proposal seems aimed at the first
> use,
> > > but
> > > > >> not the second.
> > > > >> >
> > > > >> > The second case better suits a fractional setting like the one
> we
> > > have
> > > > >> now. Using a fractional value means the amount of data cleaned
> > scales
> > > in
> > > > >> proportion to the data stored in the log. If we were to replace
> this
> > > > with
> > > > >> an absolute value it would create proportionally more cleaning
> work
> > as
> > > > the
> > > > >> log grew in size.
> > > > >> >
> > > > >> > So, if I understand this correctly, I think there is an argument
> > for
> > > > >> having both.
> > > > >> >
> > > > >> >
> > > > >> >> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io>
> > wrote:
> > > > >> >>
> > > > >> >> .... and Spark's implementation is another good reason to allow
> > > > >> compaction lag.
> > > > >> >>
> > > > >> >> I'm convinced :)
> > > > >> >>
> > > > >> >> We need to decide:
> > > > >> >>
> > > > >> >> 1) Do we need just .ms config, or anything else? consumer lag
> is
> > > > >> >> measured (and monitored) in messages, so if we need this
> feature
> > to
> > > > >> >> somehow work in tandem with consumer lag monitoring, I think we
> > > need
> > > > >> >> .messages too.
> > > > >> >>
> > > > >> >> 2) Does this new configuration allows us to get rid of
> > > cleaner.ratio
> > > > >> config?
> > > > >> >>
> > > > >> >> Gwen
> > > > >> >>
> > > > >> >>
> > > > >> >> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
> > > > >> >> <eric.wasser...@gmail.com> wrote:
> > > > >> >>> James,
> > > > >> >>>
> > > > >> >>> Your pictures do an excellent job of illustrating my point.
> > > > >> >>>
> > > > >> >>> My mention of the additional "10's of minutes to hours" refers
> > to
> > > > how
> > > > >> far after the original target checkpoint (T1 in your diagram) on
> may
> > > > need
> > > > >> to go to get to a checkpoint where all partitions of all topics
> are
> > in
> > > > the
> > > > >> uncompacted region of their respective logs. In terms of your
> > diagram:
> > > > the
> > > > >> T3 transaction could have been written 10's of minutes to hours
> > after
> > > > T1 as
> > > > >> that was how much time it took all readers to get to T1.
> > > > >> >>>
> > > > >> >>>> You would not have to start over from the beginning in order
> to
> > > > read
> > > > >> to T3.
> > > > >> >>>
> > > > >> >>> While I agree this is technically true, in practice it could
> be
> > > very
> > > > >> onerous to actually do it. For example, we use the Kafka consumer
> > that
> > > > is
> > > > >> part of the Spark Streaming library to read table topics. It
> > accepts a
> > > > >> range of offsets to read for each partition. Say we originally
> > target
> > > > >> ranges from offset 0 to the offset of T1 for each topic+partition.
> > > There
> > > > >> really is no way to have the library arrive at T1 an then "keep
> > going"
> > > > to
> > > > >> T3. What is worse, given Spark's design, if you lost a worker
> during
> > > > your
> > > > >> calculations you would be in a rather sticky position. Spark
> > achieves
> > > > >> resiliency not by data redundancy but by keeping track of how to
> > > > reproduce
> > > > >> the transformations leading to a state. In the face of a lost
> > worker,
> > > > Spark
> > > > >> would try to re-read that portion of the data on the lost worker
> > from
> > > > >> Kafka. However, in the interim compaction may have moved past the
> > > > >> reproducible checkpoint (T3) rendering the data inconsistent. At
> > best
> > > > the
> > > > >> entire calculation would need to start over targeting some later
> > > > >> transaction checkpoint.
> > > > >> >>>
> > > > >> >>> Needless to say with the proposed feature everything is quite
> > > > simple.
> > > > >> As long as we set the compaction lag large enough we can be
> assured
> > > > that T1
> > > > >> will remain in the uncompacted region an thereby be reproducible.
> > Thus
> > > > >> reading from 0 to the offsets in T1 will be sufficient for the
> > > duration
> > > > of
> > > > >> the calculation.
> > > > >> >>>
> > > > >> >>> Eric
> > > > >> >>>
> > > > >> >>>
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to