Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Tom, Documentation improvements are always welcome. The docs are in /docs under the main repository, just sent a PR for trunk and we are good :) Segment sizes - I have some objections, but this can be discussed in its own thread. I feel like I did enough hijacking and Eric may get annoyed at some point. Gwen On Fri, May 20, 2016 at 5:19 AM, Tom Crayford wrote: > Hi, > > From our perspective (running thousands of Kafka clusters), the main issues > we see with compacted topics *aren't* disk space usage, or IO utilization > of the log cleaner. > > Size matters a *lot* to the usability of consumers bootstrapping from the > beginning - in fact we've been debating tuning out the log segment size for > compacted topics to 100MB, because right now leaving 1GB of uncompacted log > makes some bootstrapping take way too long (especially for non JVM clients, > even in fast languages like Go they're not as capable of high throughput as > the JVM clients). I'm wondering if that should be a default in Kafka itself > as well, and would be happy to contribute that kind of change upstream. > Kafka already tunes the __consumer_offsets topic down to 100MB per segment > for this exact reason. > > Secondly, the docs don't make it clear (and this has confused dozens of > well intentioned, smart folk that we've talked to, and likely thousands of > Kafka users across the board) that compaction is an *alternative* to time > based retention. Lots of folk used compaction assuming "it's like time > based retention, but with even less space usage". Switching between the two > is thankfully easy, but it's been a very confusing thing to understand. I'd > like to contribute back clearer docs to Kafka about this. Should I send a > PR? Would that be welcome? > > Thirdly, most users *don't* want to tune Kafka's settings at all, or even > know how or when they should. Whilst some amount of tuning is inevitable, > the drive Gwen has towards "less tuning" is very positive from our > perspective. Most users of most software (including technical users of data > storage and messaging systems) want to "just use it" and not worry about > "do I need to monitor a thousand things and then tune another thousand > based on my metrics". Whilst some of that is unavoidable (for sure), it > feels like compaction tuning should be something the project provides > *great* general purpose defaults for most users, which cover most of the > cases, which leave tuning just to the 1% of folk who really really care. > The current defaults seem to be doing well here (barring the above note > about log compaction size), and any future changes here should keep this > up. > > Thanks > > Tom Crayford > Heroku Kafka > > On Fri, May 20, 2016 at 4:48 AM, Jay Kreps wrote: > > > Hey Gwen, > > > > Yeah specifying in bytes versus the utilization percent would have been > > easier to implement. The argument against that is that basically users > are > > super terrible at predicting and updating data sizes as stuff grows and > > you'd have to really set this then for each individual log perhaps? > > Currently I think that the utilization number of 50% is pretty reasonable > > for most people and you only need to tune it if you really want to > > optimize. But if you set a fixed size compaction threshold in bytes then > > how aggressive this is and the resulting utilization totally depends on > the > > compacted size of the data in the topic. i.e. if it defaults to 20GB then > > that becomes the minimum size of the log, so if you end up with a bunch > of > > topics with 100mb of compacted data they all end up growing to 20GB. As a > > user if you think you've written 100*100mb worth of compacted partitions > > but Kafka has 100*20GB of data I think you'd be a bit shocked. > > > > Ben--I think your proposal attempts to minimize total I/O by waiting > until > > the compaction buffer will be maxed out. Each unique key in the > uncompacted > > log uses 24 bytes of compaction buffer iirc but since you don't know the > > number of unique keys it's a bit hard to guess this. You could assume > they > > are all unique and only compact when you have N/24 messages in the > > uncompacted log where N is the compaction buffer size in bytes. The issue > > as with Gwen's proposal is that by doing this you really lose control of > > disk utilization which might be a bit unintuitive. Your idea of just > using > > the free disk space might fix this though it might be somewhat complex in > > the mixed setting with both compacted and non-compacted topics. > > > > One other thing worth noting is that compaction isn't just for disk > space. > > A consumer that bootstraps from the beginning (a la state restore in > Kafka > > Streams) has to fully read and process the whole log so I think you want > to > > compact even when you still have free space. > > > > -Jay > > > > > > > > On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira > wrote: > > > > > Oops :) > > > > > > The docs are definitely not doing th
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Hi, >From our perspective (running thousands of Kafka clusters), the main issues we see with compacted topics *aren't* disk space usage, or IO utilization of the log cleaner. Size matters a *lot* to the usability of consumers bootstrapping from the beginning - in fact we've been debating tuning out the log segment size for compacted topics to 100MB, because right now leaving 1GB of uncompacted log makes some bootstrapping take way too long (especially for non JVM clients, even in fast languages like Go they're not as capable of high throughput as the JVM clients). I'm wondering if that should be a default in Kafka itself as well, and would be happy to contribute that kind of change upstream. Kafka already tunes the __consumer_offsets topic down to 100MB per segment for this exact reason. Secondly, the docs don't make it clear (and this has confused dozens of well intentioned, smart folk that we've talked to, and likely thousands of Kafka users across the board) that compaction is an *alternative* to time based retention. Lots of folk used compaction assuming "it's like time based retention, but with even less space usage". Switching between the two is thankfully easy, but it's been a very confusing thing to understand. I'd like to contribute back clearer docs to Kafka about this. Should I send a PR? Would that be welcome? Thirdly, most users *don't* want to tune Kafka's settings at all, or even know how or when they should. Whilst some amount of tuning is inevitable, the drive Gwen has towards "less tuning" is very positive from our perspective. Most users of most software (including technical users of data storage and messaging systems) want to "just use it" and not worry about "do I need to monitor a thousand things and then tune another thousand based on my metrics". Whilst some of that is unavoidable (for sure), it feels like compaction tuning should be something the project provides *great* general purpose defaults for most users, which cover most of the cases, which leave tuning just to the 1% of folk who really really care. The current defaults seem to be doing well here (barring the above note about log compaction size), and any future changes here should keep this up. Thanks Tom Crayford Heroku Kafka On Fri, May 20, 2016 at 4:48 AM, Jay Kreps wrote: > Hey Gwen, > > Yeah specifying in bytes versus the utilization percent would have been > easier to implement. The argument against that is that basically users are > super terrible at predicting and updating data sizes as stuff grows and > you'd have to really set this then for each individual log perhaps? > Currently I think that the utilization number of 50% is pretty reasonable > for most people and you only need to tune it if you really want to > optimize. But if you set a fixed size compaction threshold in bytes then > how aggressive this is and the resulting utilization totally depends on the > compacted size of the data in the topic. i.e. if it defaults to 20GB then > that becomes the minimum size of the log, so if you end up with a bunch of > topics with 100mb of compacted data they all end up growing to 20GB. As a > user if you think you've written 100*100mb worth of compacted partitions > but Kafka has 100*20GB of data I think you'd be a bit shocked. > > Ben--I think your proposal attempts to minimize total I/O by waiting until > the compaction buffer will be maxed out. Each unique key in the uncompacted > log uses 24 bytes of compaction buffer iirc but since you don't know the > number of unique keys it's a bit hard to guess this. You could assume they > are all unique and only compact when you have N/24 messages in the > uncompacted log where N is the compaction buffer size in bytes. The issue > as with Gwen's proposal is that by doing this you really lose control of > disk utilization which might be a bit unintuitive. Your idea of just using > the free disk space might fix this though it might be somewhat complex in > the mixed setting with both compacted and non-compacted topics. > > One other thing worth noting is that compaction isn't just for disk space. > A consumer that bootstraps from the beginning (a la state restore in Kafka > Streams) has to fully read and process the whole log so I think you want to > compact even when you still have free space. > > -Jay > > > > On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira wrote: > > > Oops :) > > > > The docs are definitely not doing the feature any favors, but I didn't > mean > > to imply the feature is thoughtless. > > > > Here's the thing I'm not getting: You are trading off disk space for IO > > efficiency. Thats reasonable. But why not allow users to specify space in > > bytes? > > > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or > post > > KIP-58, X bytes of data that needs cleaning), please compact it to the > best > > of your ability (which in steady state will be into almost nothing). > > > > Since we know how big the compaction buffer is and how
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Hey Gwen, Yeah specifying in bytes versus the utilization percent would have been easier to implement. The argument against that is that basically users are super terrible at predicting and updating data sizes as stuff grows and you'd have to really set this then for each individual log perhaps? Currently I think that the utilization number of 50% is pretty reasonable for most people and you only need to tune it if you really want to optimize. But if you set a fixed size compaction threshold in bytes then how aggressive this is and the resulting utilization totally depends on the compacted size of the data in the topic. i.e. if it defaults to 20GB then that becomes the minimum size of the log, so if you end up with a bunch of topics with 100mb of compacted data they all end up growing to 20GB. As a user if you think you've written 100*100mb worth of compacted partitions but Kafka has 100*20GB of data I think you'd be a bit shocked. Ben--I think your proposal attempts to minimize total I/O by waiting until the compaction buffer will be maxed out. Each unique key in the uncompacted log uses 24 bytes of compaction buffer iirc but since you don't know the number of unique keys it's a bit hard to guess this. You could assume they are all unique and only compact when you have N/24 messages in the uncompacted log where N is the compaction buffer size in bytes. The issue as with Gwen's proposal is that by doing this you really lose control of disk utilization which might be a bit unintuitive. Your idea of just using the free disk space might fix this though it might be somewhat complex in the mixed setting with both compacted and non-compacted topics. One other thing worth noting is that compaction isn't just for disk space. A consumer that bootstraps from the beginning (a la state restore in Kafka Streams) has to fully read and process the whole log so I think you want to compact even when you still have free space. -Jay On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira wrote: > Oops :) > > The docs are definitely not doing the feature any favors, but I didn't mean > to imply the feature is thoughtless. > > Here's the thing I'm not getting: You are trading off disk space for IO > efficiency. Thats reasonable. But why not allow users to specify space in > bytes? > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or post > KIP-58, X bytes of data that needs cleaning), please compact it to the best > of your ability (which in steady state will be into almost nothing). > > Since we know how big the compaction buffer is and how Kafka uses it, we > can exactly calculate how much space we are wasting vs. how much IO we are > going to do per unit of time. The size of a single segment or compaction > buffer (whichever is bigger) can be a good default value for > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > amount of free space on the disk. Heck, we can automate those tunings > (lower min.dirty.bytes to trigger compaction and free space if we are close > to running out of space). > > We can do the same capacity planning with percentages but it requires more > information to know the results, information that can only be acquired > after you reach steady state. > > It is a bit obvious, so I'm guessing the idea was considered and dismissed. > I just can't see why. > If only there were KIPs back then, so I could look at rejected > alternatives... > > Gwen > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps wrote: > > > So in summary we never considered this a mechanism to give the consumer > > time to consume prior to compaction, just a mechanism to control space > > wastage. It sort of accidentally gives you that but it's super hard to > > reason about it as an SLA since it is relative to the log size rather > than > > absolute. > > > > -Jay > > > > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps wrote: > > > > > The sad part is I actually did think pretty hard about how to configure > > > that stuff so I guess *I* think the config makes sense! Clearly trying > to > > > prevent my being shot :-) > > > > > > I agree the name could be improved and the documentation is quite > > > spartan--no guidance at all on how to set it or what it trades off. A > bit > > > shameful. > > > > > > The thinking was this. One approach to cleaning would be to just do it > > > continually with the idea that, hey, you can't take that I/O with > > you--once > > > you've budgeted N MB/sec of background I/O for compaction some of the > > time, > > > you might as well just use that budget all the time. But this leads to > > > seemingly silly behavior where you are doing big ass compactions all > the > > > time to free up just a few bytes and we thought it would freak people > > out. > > > Plus arguably Kafka usage isn't all in steady state so this wastage > would > > > come out of the budget for other bursty stuff. > > > > > > So when should compaction kick in? Well what are you trading off? The > > >
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
No, you are right that mapping dirty-bytes to dirty-map sizes is non-trivial. I think it would be good to discuss an alternative approach, but this is probably the wrong thread :) On Thu, May 19, 2016 at 4:36 AM, Ben Stopford wrote: > Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree > that a more holistic solution, which tuned itself to total disk > availability, might be quite useful :) > > If we took the min.dirty.bytes route, and defaulted it to the segment > size, that would work well for distributions where the dirty-map > (compaction buffer) will be filled by a single dirty segment, but this > would depend a bit on the message size. If messages were large the > dirty-map might not fill, which would reduce the yield from the scan. In > fact there seems a general incentive to defer scanning to ensure the > dirty-map always fills. For this reason, the ratio approach still seems a > little more general to me as it applies equally to large and small > partitions. > > Let me know if I’m missing something here. > > B > > > > On 19 May 2016, at 06:29, Gwen Shapira wrote: > > > > Oops :) > > > > The docs are definitely not doing the feature any favors, but I didn't > mean > > to imply the feature is thoughtless. > > > > Here's the thing I'm not getting: You are trading off disk space for IO > > efficiency. Thats reasonable. But why not allow users to specify space in > > bytes? > > > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or > post > > KIP-58, X bytes of data that needs cleaning), please compact it to the > best > > of your ability (which in steady state will be into almost nothing). > > > > Since we know how big the compaction buffer is and how Kafka uses it, we > > can exactly calculate how much space we are wasting vs. how much IO we > are > > going to do per unit of time. The size of a single segment or compaction > > buffer (whichever is bigger) can be a good default value for > > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > > amount of free space on the disk. Heck, we can automate those tunings > > (lower min.dirty.bytes to trigger compaction and free space if we are > close > > to running out of space). > > > > We can do the same capacity planning with percentages but it requires > more > > information to know the results, information that can only be acquired > > after you reach steady state. > > > > It is a bit obvious, so I'm guessing the idea was considered and > dismissed. > > I just can't see why. > > If only there were KIPs back then, so I could look at rejected > > alternatives... > > > > Gwen > > > > > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps wrote: > > > >> So in summary we never considered this a mechanism to give the consumer > >> time to consume prior to compaction, just a mechanism to control space > >> wastage. It sort of accidentally gives you that but it's super hard to > >> reason about it as an SLA since it is relative to the log size rather > than > >> absolute. > >> > >> -Jay > >> > >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps wrote: > >> > >>> The sad part is I actually did think pretty hard about how to configure > >>> that stuff so I guess *I* think the config makes sense! Clearly trying > to > >>> prevent my being shot :-) > >>> > >>> I agree the name could be improved and the documentation is quite > >>> spartan--no guidance at all on how to set it or what it trades off. A > bit > >>> shameful. > >>> > >>> The thinking was this. One approach to cleaning would be to just do it > >>> continually with the idea that, hey, you can't take that I/O with > >> you--once > >>> you've budgeted N MB/sec of background I/O for compaction some of the > >> time, > >>> you might as well just use that budget all the time. But this leads to > >>> seemingly silly behavior where you are doing big ass compactions all > the > >>> time to free up just a few bytes and we thought it would freak people > >> out. > >>> Plus arguably Kafka usage isn't all in steady state so this wastage > would > >>> come out of the budget for other bursty stuff. > >>> > >>> So when should compaction kick in? Well what are you trading off? The > >>> tradeoff here is how much space to waste on disk versus how much I/O to > >> use > >>> in cleaning. In general we can't say exactly how much space a > compaction > >>> will free up--during a phase of all "inserts" compaction may free up no > >>> space at all. You just have to do the compaction and hope for the best. > >> But > >>> in general for most compacted topics they should soon reach a "steady > >>> state" where they aren't growing or growing very slowly, so most writes > >> are > >>> updates (if they keep growing rapidly indefinitely then you are going > to > >>> run out of space--so safe to assume they do reach steady state). In > this > >>> steady state the ratio of uncompacted log to total log is effectively > the > >>> utilization (wasted space percentage). So if you set
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree that a more holistic solution, which tuned itself to total disk availability, might be quite useful :) If we took the min.dirty.bytes route, and defaulted it to the segment size, that would work well for distributions where the dirty-map (compaction buffer) will be filled by a single dirty segment, but this would depend a bit on the message size. If messages were large the dirty-map might not fill, which would reduce the yield from the scan. In fact there seems a general incentive to defer scanning to ensure the dirty-map always fills. For this reason, the ratio approach still seems a little more general to me as it applies equally to large and small partitions. Let me know if I’m missing something here. B > On 19 May 2016, at 06:29, Gwen Shapira wrote: > > Oops :) > > The docs are definitely not doing the feature any favors, but I didn't mean > to imply the feature is thoughtless. > > Here's the thing I'm not getting: You are trading off disk space for IO > efficiency. Thats reasonable. But why not allow users to specify space in > bytes? > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or post > KIP-58, X bytes of data that needs cleaning), please compact it to the best > of your ability (which in steady state will be into almost nothing). > > Since we know how big the compaction buffer is and how Kafka uses it, we > can exactly calculate how much space we are wasting vs. how much IO we are > going to do per unit of time. The size of a single segment or compaction > buffer (whichever is bigger) can be a good default value for > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > amount of free space on the disk. Heck, we can automate those tunings > (lower min.dirty.bytes to trigger compaction and free space if we are close > to running out of space). > > We can do the same capacity planning with percentages but it requires more > information to know the results, information that can only be acquired > after you reach steady state. > > It is a bit obvious, so I'm guessing the idea was considered and dismissed. > I just can't see why. > If only there were KIPs back then, so I could look at rejected > alternatives... > > Gwen > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps wrote: > >> So in summary we never considered this a mechanism to give the consumer >> time to consume prior to compaction, just a mechanism to control space >> wastage. It sort of accidentally gives you that but it's super hard to >> reason about it as an SLA since it is relative to the log size rather than >> absolute. >> >> -Jay >> >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps wrote: >> >>> The sad part is I actually did think pretty hard about how to configure >>> that stuff so I guess *I* think the config makes sense! Clearly trying to >>> prevent my being shot :-) >>> >>> I agree the name could be improved and the documentation is quite >>> spartan--no guidance at all on how to set it or what it trades off. A bit >>> shameful. >>> >>> The thinking was this. One approach to cleaning would be to just do it >>> continually with the idea that, hey, you can't take that I/O with >> you--once >>> you've budgeted N MB/sec of background I/O for compaction some of the >> time, >>> you might as well just use that budget all the time. But this leads to >>> seemingly silly behavior where you are doing big ass compactions all the >>> time to free up just a few bytes and we thought it would freak people >> out. >>> Plus arguably Kafka usage isn't all in steady state so this wastage would >>> come out of the budget for other bursty stuff. >>> >>> So when should compaction kick in? Well what are you trading off? The >>> tradeoff here is how much space to waste on disk versus how much I/O to >> use >>> in cleaning. In general we can't say exactly how much space a compaction >>> will free up--during a phase of all "inserts" compaction may free up no >>> space at all. You just have to do the compaction and hope for the best. >> But >>> in general for most compacted topics they should soon reach a "steady >>> state" where they aren't growing or growing very slowly, so most writes >> are >>> updates (if they keep growing rapidly indefinitely then you are going to >>> run out of space--so safe to assume they do reach steady state). In this >>> steady state the ratio of uncompacted log to total log is effectively the >>> utilization (wasted space percentage). So if you set it to 50% your data >> is >>> about half duplicates. By tolerating more uncleaned log you get more bang >>> for your compaction I/O buck but more space wastage. This seemed like a >>> reasonable way to think about it because maybe you know your compacted >> data >>> size (roughly) so you can reason about whether using, say, twice that >> space >>> is okay. >>> >>> Maybe we should just change the name to something about target >> util
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Oops :) The docs are definitely not doing the feature any favors, but I didn't mean to imply the feature is thoughtless. Here's the thing I'm not getting: You are trading off disk space for IO efficiency. Thats reasonable. But why not allow users to specify space in bytes? Basically tell the LogCompacter: Once I have X bytes of dirty data (or post KIP-58, X bytes of data that needs cleaning), please compact it to the best of your ability (which in steady state will be into almost nothing). Since we know how big the compaction buffer is and how Kafka uses it, we can exactly calculate how much space we are wasting vs. how much IO we are going to do per unit of time. The size of a single segment or compaction buffer (whichever is bigger) can be a good default value for min.dirty.bytes. We can even evaluate and re-evaluate it based on the amount of free space on the disk. Heck, we can automate those tunings (lower min.dirty.bytes to trigger compaction and free space if we are close to running out of space). We can do the same capacity planning with percentages but it requires more information to know the results, information that can only be acquired after you reach steady state. It is a bit obvious, so I'm guessing the idea was considered and dismissed. I just can't see why. If only there were KIPs back then, so I could look at rejected alternatives... Gwen On Wed, May 18, 2016 at 9:54 PM, Jay Kreps wrote: > So in summary we never considered this a mechanism to give the consumer > time to consume prior to compaction, just a mechanism to control space > wastage. It sort of accidentally gives you that but it's super hard to > reason about it as an SLA since it is relative to the log size rather than > absolute. > > -Jay > > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps wrote: > > > The sad part is I actually did think pretty hard about how to configure > > that stuff so I guess *I* think the config makes sense! Clearly trying to > > prevent my being shot :-) > > > > I agree the name could be improved and the documentation is quite > > spartan--no guidance at all on how to set it or what it trades off. A bit > > shameful. > > > > The thinking was this. One approach to cleaning would be to just do it > > continually with the idea that, hey, you can't take that I/O with > you--once > > you've budgeted N MB/sec of background I/O for compaction some of the > time, > > you might as well just use that budget all the time. But this leads to > > seemingly silly behavior where you are doing big ass compactions all the > > time to free up just a few bytes and we thought it would freak people > out. > > Plus arguably Kafka usage isn't all in steady state so this wastage would > > come out of the budget for other bursty stuff. > > > > So when should compaction kick in? Well what are you trading off? The > > tradeoff here is how much space to waste on disk versus how much I/O to > use > > in cleaning. In general we can't say exactly how much space a compaction > > will free up--during a phase of all "inserts" compaction may free up no > > space at all. You just have to do the compaction and hope for the best. > But > > in general for most compacted topics they should soon reach a "steady > > state" where they aren't growing or growing very slowly, so most writes > are > > updates (if they keep growing rapidly indefinitely then you are going to > > run out of space--so safe to assume they do reach steady state). In this > > steady state the ratio of uncompacted log to total log is effectively the > > utilization (wasted space percentage). So if you set it to 50% your data > is > > about half duplicates. By tolerating more uncleaned log you get more bang > > for your compaction I/O buck but more space wastage. This seemed like a > > reasonable way to think about it because maybe you know your compacted > data > > size (roughly) so you can reason about whether using, say, twice that > space > > is okay. > > > > Maybe we should just change the name to something about target > utilization > > even though that isn't strictly true except in steady state? > > > > -Jay > > > > > > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira wrote: > > > >> Interesting! > >> > >> This needs to be double checked by someone with more experience, but > >> reading the code, it looks like "log.cleaner.min.cleanable.ratio" > >> controls *just* the second property, and I'm not even convinced about > >> that. > >> > >> Few facts: > >> > >> 1. Each cleaner thread cleans one log at a time. It always goes for > >> the log with the largest percentage of non-compacted bytes. If you > >> just created a new partition, wrote 1G and switched to a new segment, > >> it is very likely that this will be the next log to compact. > >> Explaining the behavior Eric and Jay complained about. I expected it > >> to be rare. > >> > >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever > >> min.cleanable is), it will be skipped, knowing that others hav
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
So in summary we never considered this a mechanism to give the consumer time to consume prior to compaction, just a mechanism to control space wastage. It sort of accidentally gives you that but it's super hard to reason about it as an SLA since it is relative to the log size rather than absolute. -Jay On Wed, May 18, 2016 at 9:50 PM, Jay Kreps wrote: > The sad part is I actually did think pretty hard about how to configure > that stuff so I guess *I* think the config makes sense! Clearly trying to > prevent my being shot :-) > > I agree the name could be improved and the documentation is quite > spartan--no guidance at all on how to set it or what it trades off. A bit > shameful. > > The thinking was this. One approach to cleaning would be to just do it > continually with the idea that, hey, you can't take that I/O with you--once > you've budgeted N MB/sec of background I/O for compaction some of the time, > you might as well just use that budget all the time. But this leads to > seemingly silly behavior where you are doing big ass compactions all the > time to free up just a few bytes and we thought it would freak people out. > Plus arguably Kafka usage isn't all in steady state so this wastage would > come out of the budget for other bursty stuff. > > So when should compaction kick in? Well what are you trading off? The > tradeoff here is how much space to waste on disk versus how much I/O to use > in cleaning. In general we can't say exactly how much space a compaction > will free up--during a phase of all "inserts" compaction may free up no > space at all. You just have to do the compaction and hope for the best. But > in general for most compacted topics they should soon reach a "steady > state" where they aren't growing or growing very slowly, so most writes are > updates (if they keep growing rapidly indefinitely then you are going to > run out of space--so safe to assume they do reach steady state). In this > steady state the ratio of uncompacted log to total log is effectively the > utilization (wasted space percentage). So if you set it to 50% your data is > about half duplicates. By tolerating more uncleaned log you get more bang > for your compaction I/O buck but more space wastage. This seemed like a > reasonable way to think about it because maybe you know your compacted data > size (roughly) so you can reason about whether using, say, twice that space > is okay. > > Maybe we should just change the name to something about target utilization > even though that isn't strictly true except in steady state? > > -Jay > > > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira wrote: > >> Interesting! >> >> This needs to be double checked by someone with more experience, but >> reading the code, it looks like "log.cleaner.min.cleanable.ratio" >> controls *just* the second property, and I'm not even convinced about >> that. >> >> Few facts: >> >> 1. Each cleaner thread cleans one log at a time. It always goes for >> the log with the largest percentage of non-compacted bytes. If you >> just created a new partition, wrote 1G and switched to a new segment, >> it is very likely that this will be the next log to compact. >> Explaining the behavior Eric and Jay complained about. I expected it >> to be rare. >> >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever >> min.cleanable is), it will be skipped, knowing that others have even >> lower ditry ratio. >> >> 3. If we do decide to clean a log, we will clean the whole damn thing, >> leaving only the active segment. Contrary to my expectations, it does >> not leave any dirty byte behind. So *at most* you will have a single >> clean segment. Again, explaining why Jay, James and Eric are unhappy. >> >> 4. What is does guarantee (kinda? at least I think it tries?) is to >> always clean a large "chunk" of data at once, hopefully minimizing >> churn (cleaning small bits off the same log over and over) and >> minimizing IO. It does have the nice mathematical property of >> guaranteeing double the amount of time between cleanings (except it >> doesn't really, because who knows the size of the compacted region). >> >> 5. Whoever wrote the docs should be shot :) >> >> so, in conclusion: >> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, >> difficult to understand, and IMO doesn't even do what it should do. >> I would like to consider the possibility of >> min.cleanable.dirty.bytes, which should give good control over # of IO >> operations (since the size of compaction buffer is known). >> >> In the context of this KIP, the interaction with cleanable ratio and >> cleanable bytes will be similar, and it looks like it was already done >> correctly in the PR, so no worries ("the ratio's definition will be >> expanded to become the ratio of "compactable" to compactable plus >> compacted message sizes. Where compactable includes log segments that >> are neither the active segment nor those prohibited from being >> compacted because they
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
The sad part is I actually did think pretty hard about how to configure that stuff so I guess *I* think the config makes sense! Clearly trying to prevent my being shot :-) I agree the name could be improved and the documentation is quite spartan--no guidance at all on how to set it or what it trades off. A bit shameful. The thinking was this. One approach to cleaning would be to just do it continually with the idea that, hey, you can't take that I/O with you--once you've budgeted N MB/sec of background I/O for compaction some of the time, you might as well just use that budget all the time. But this leads to seemingly silly behavior where you are doing big ass compactions all the time to free up just a few bytes and we thought it would freak people out. Plus arguably Kafka usage isn't all in steady state so this wastage would come out of the budget for other bursty stuff. So when should compaction kick in? Well what are you trading off? The tradeoff here is how much space to waste on disk versus how much I/O to use in cleaning. In general we can't say exactly how much space a compaction will free up--during a phase of all "inserts" compaction may free up no space at all. You just have to do the compaction and hope for the best. But in general for most compacted topics they should soon reach a "steady state" where they aren't growing or growing very slowly, so most writes are updates (if they keep growing rapidly indefinitely then you are going to run out of space--so safe to assume they do reach steady state). In this steady state the ratio of uncompacted log to total log is effectively the utilization (wasted space percentage). So if you set it to 50% your data is about half duplicates. By tolerating more uncleaned log you get more bang for your compaction I/O buck but more space wastage. This seemed like a reasonable way to think about it because maybe you know your compacted data size (roughly) so you can reason about whether using, say, twice that space is okay. Maybe we should just change the name to something about target utilization even though that isn't strictly true except in steady state? -Jay On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira wrote: > Interesting! > > This needs to be double checked by someone with more experience, but > reading the code, it looks like "log.cleaner.min.cleanable.ratio" > controls *just* the second property, and I'm not even convinced about > that. > > Few facts: > > 1. Each cleaner thread cleans one log at a time. It always goes for > the log with the largest percentage of non-compacted bytes. If you > just created a new partition, wrote 1G and switched to a new segment, > it is very likely that this will be the next log to compact. > Explaining the behavior Eric and Jay complained about. I expected it > to be rare. > > 2. If the dirtiest log has less than 50% dirty bytes (or whatever > min.cleanable is), it will be skipped, knowing that others have even > lower ditry ratio. > > 3. If we do decide to clean a log, we will clean the whole damn thing, > leaving only the active segment. Contrary to my expectations, it does > not leave any dirty byte behind. So *at most* you will have a single > clean segment. Again, explaining why Jay, James and Eric are unhappy. > > 4. What is does guarantee (kinda? at least I think it tries?) is to > always clean a large "chunk" of data at once, hopefully minimizing > churn (cleaning small bits off the same log over and over) and > minimizing IO. It does have the nice mathematical property of > guaranteeing double the amount of time between cleanings (except it > doesn't really, because who knows the size of the compacted region). > > 5. Whoever wrote the docs should be shot :) > > so, in conclusion: > In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, > difficult to understand, and IMO doesn't even do what it should do. > I would like to consider the possibility of > min.cleanable.dirty.bytes, which should give good control over # of IO > operations (since the size of compaction buffer is known). > > In the context of this KIP, the interaction with cleanable ratio and > cleanable bytes will be similar, and it looks like it was already done > correctly in the PR, so no worries ("the ratio's definition will be > expanded to become the ratio of "compactable" to compactable plus > compacted message sizes. Where compactable includes log segments that > are neither the active segment nor those prohibited from being > compacted because they contain messages that do not satisfy all the > new lag constraints" > > I may open a new KIP to handle the cleanable ratio. Please don't let > my confusion detract from this KIP. > > Gwen > > On Wed, May 18, 2016 at 3:41 PM, Ben Stopford wrote: > > Generally, this seems like a sensible proposal to me. > > > > Regarding (1): time and message count seem sensible. I can’t think of a > specific use case for bytes but it seems like there could be one. > > > > Regarding (2): > > Th
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Interesting! This needs to be double checked by someone with more experience, but reading the code, it looks like "log.cleaner.min.cleanable.ratio" controls *just* the second property, and I'm not even convinced about that. Few facts: 1. Each cleaner thread cleans one log at a time. It always goes for the log with the largest percentage of non-compacted bytes. If you just created a new partition, wrote 1G and switched to a new segment, it is very likely that this will be the next log to compact. Explaining the behavior Eric and Jay complained about. I expected it to be rare. 2. If the dirtiest log has less than 50% dirty bytes (or whatever min.cleanable is), it will be skipped, knowing that others have even lower ditry ratio. 3. If we do decide to clean a log, we will clean the whole damn thing, leaving only the active segment. Contrary to my expectations, it does not leave any dirty byte behind. So *at most* you will have a single clean segment. Again, explaining why Jay, James and Eric are unhappy. 4. What is does guarantee (kinda? at least I think it tries?) is to always clean a large "chunk" of data at once, hopefully minimizing churn (cleaning small bits off the same log over and over) and minimizing IO. It does have the nice mathematical property of guaranteeing double the amount of time between cleanings (except it doesn't really, because who knows the size of the compacted region). 5. Whoever wrote the docs should be shot :) so, in conclusion: In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, difficult to understand, and IMO doesn't even do what it should do. I would like to consider the possibility of min.cleanable.dirty.bytes, which should give good control over # of IO operations (since the size of compaction buffer is known). In the context of this KIP, the interaction with cleanable ratio and cleanable bytes will be similar, and it looks like it was already done correctly in the PR, so no worries ("the ratio's definition will be expanded to become the ratio of "compactable" to compactable plus compacted message sizes. Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy all the new lag constraints" I may open a new KIP to handle the cleanable ratio. Please don't let my confusion detract from this KIP. Gwen On Wed, May 18, 2016 at 3:41 PM, Ben Stopford wrote: > Generally, this seems like a sensible proposal to me. > > Regarding (1): time and message count seem sensible. I can’t think of a > specific use case for bytes but it seems like there could be one. > > Regarding (2): > The setting log.cleaner.min.cleanable.ratio currently seems to have two uses. > It controls which messages will not be compacted, but it also provides a > fractional bound on how many logs are cleaned (and hence work done) in each > round. This new proposal seems aimed at the first use, but not the second. > > The second case better suits a fractional setting like the one we have now. > Using a fractional value means the amount of data cleaned scales in > proportion to the data stored in the log. If we were to replace this with an > absolute value it would create proportionally more cleaning work as the log > grew in size. > > So, if I understand this correctly, I think there is an argument for having > both. > > >> On 17 May 2016, at 19:43, Gwen Shapira wrote: >> >> and Spark's implementation is another good reason to allow compaction >> lag. >> >> I'm convinced :) >> >> We need to decide: >> >> 1) Do we need just .ms config, or anything else? consumer lag is >> measured (and monitored) in messages, so if we need this feature to >> somehow work in tandem with consumer lag monitoring, I think we need >> .messages too. >> >> 2) Does this new configuration allows us to get rid of cleaner.ratio config? >> >> Gwen >> >> >> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman >> wrote: >>> James, >>> >>> Your pictures do an excellent job of illustrating my point. >>> >>> My mention of the additional "10's of minutes to hours" refers to how far >>> after the original target checkpoint (T1 in your diagram) on may need to go >>> to get to a checkpoint where all partitions of all topics are in the >>> uncompacted region of their respective logs. In terms of your diagram: the >>> T3 transaction could have been written 10's of minutes to hours after T1 as >>> that was how much time it took all readers to get to T1. >>> You would not have to start over from the beginning in order to read to T3. >>> >>> While I agree this is technically true, in practice it could be very >>> onerous to actually do it. For example, we use the Kafka consumer that is >>> part of the Spark Streaming library to read table topics. It accepts a >>> range of offsets to read for each partition. Say we originally target >>> ranges from offset 0 to the offset of T1 for each topic+partition. Th
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Generally, this seems like a sensible proposal to me. Regarding (1): time and message count seem sensible. I can’t think of a specific use case for bytes but it seems like there could be one. Regarding (2): The setting log.cleaner.min.cleanable.ratio currently seems to have two uses. It controls which messages will not be compacted, but it also provides a fractional bound on how many logs are cleaned (and hence work done) in each round. This new proposal seems aimed at the first use, but not the second. The second case better suits a fractional setting like the one we have now. Using a fractional value means the amount of data cleaned scales in proportion to the data stored in the log. If we were to replace this with an absolute value it would create proportionally more cleaning work as the log grew in size. So, if I understand this correctly, I think there is an argument for having both. > On 17 May 2016, at 19:43, Gwen Shapira wrote: > > and Spark's implementation is another good reason to allow compaction > lag. > > I'm convinced :) > > We need to decide: > > 1) Do we need just .ms config, or anything else? consumer lag is > measured (and monitored) in messages, so if we need this feature to > somehow work in tandem with consumer lag monitoring, I think we need > .messages too. > > 2) Does this new configuration allows us to get rid of cleaner.ratio config? > > Gwen > > > On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman > wrote: >> James, >> >> Your pictures do an excellent job of illustrating my point. >> >> My mention of the additional "10's of minutes to hours" refers to how far >> after the original target checkpoint (T1 in your diagram) on may need to go >> to get to a checkpoint where all partitions of all topics are in the >> uncompacted region of their respective logs. In terms of your diagram: the >> T3 transaction could have been written 10's of minutes to hours after T1 as >> that was how much time it took all readers to get to T1. >> >>> You would not have to start over from the beginning in order to read to T3. >> >> While I agree this is technically true, in practice it could be very onerous >> to actually do it. For example, we use the Kafka consumer that is part of >> the Spark Streaming library to read table topics. It accepts a range of >> offsets to read for each partition. Say we originally target ranges from >> offset 0 to the offset of T1 for each topic+partition. There really is no >> way to have the library arrive at T1 an then "keep going" to T3. What is >> worse, given Spark's design, if you lost a worker during your calculations >> you would be in a rather sticky position. Spark achieves resiliency not by >> data redundancy but by keeping track of how to reproduce the transformations >> leading to a state. In the face of a lost worker, Spark would try to re-read >> that portion of the data on the lost worker from Kafka. However, in the >> interim compaction may have moved past the reproducible checkpoint (T3) >> rendering the data inconsistent. At best the entire calculation would need >> to start over targeting some later transaction checkpoint. >> >> Needless to say with the proposed feature everything is quite simple. As >> long as we set the compaction lag large enough we can be assured that T1 >> will remain in the uncompacted region an thereby be reproducible. Thus >> reading from 0 to the offsets in T1 will be sufficient for the duration of >> the calculation. >> >> Eric >> >>
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
and Spark's implementation is another good reason to allow compaction lag. I'm convinced :) We need to decide: 1) Do we need just .ms config, or anything else? consumer lag is measured (and monitored) in messages, so if we need this feature to somehow work in tandem with consumer lag monitoring, I think we need .messages too. 2) Does this new configuration allows us to get rid of cleaner.ratio config? Gwen On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman wrote: > James, > > Your pictures do an excellent job of illustrating my point. > > My mention of the additional "10's of minutes to hours" refers to how far > after the original target checkpoint (T1 in your diagram) on may need to go > to get to a checkpoint where all partitions of all topics are in the > uncompacted region of their respective logs. In terms of your diagram: the T3 > transaction could have been written 10's of minutes to hours after T1 as that > was how much time it took all readers to get to T1. > >> You would not have to start over from the beginning in order to read to T3. > > While I agree this is technically true, in practice it could be very onerous > to actually do it. For example, we use the Kafka consumer that is part of the > Spark Streaming library to read table topics. It accepts a range of offsets > to read for each partition. Say we originally target ranges from offset 0 to > the offset of T1 for each topic+partition. There really is no way to have the > library arrive at T1 an then "keep going" to T3. What is worse, given Spark's > design, if you lost a worker during your calculations you would be in a > rather sticky position. Spark achieves resiliency not by data redundancy but > by keeping track of how to reproduce the transformations leading to a state. > In the face of a lost worker, Spark would try to re-read that portion of the > data on the lost worker from Kafka. However, in the interim compaction may > have moved past the reproducible checkpoint (T3) rendering the data > inconsistent. At best the entire calculation would need to start over > targeting some later transaction checkpoint. > > Needless to say with the proposed feature everything is quite simple. As long > as we set the compaction lag large enough we can be assured that T1 will > remain in the uncompacted region an thereby be reproducible. Thus reading > from 0 to the offsets in T1 will be sufficient for the duration of the > calculation. > > Eric > >
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
James, Your pictures do an excellent job of illustrating my point. My mention of the additional "10's of minutes to hours" refers to how far after the original target checkpoint (T1 in your diagram) on may need to go to get to a checkpoint where all partitions of all topics are in the uncompacted region of their respective logs. In terms of your diagram: the T3 transaction could have been written 10's of minutes to hours after T1 as that was how much time it took all readers to get to T1. > You would not have to start over from the beginning in order to read to T3. While I agree this is technically true, in practice it could be very onerous to actually do it. For example, we use the Kafka consumer that is part of the Spark Streaming library to read table topics. It accepts a range of offsets to read for each partition. Say we originally target ranges from offset 0 to the offset of T1 for each topic+partition. There really is no way to have the library arrive at T1 an then "keep going" to T3. What is worse, given Spark's design, if you lost a worker during your calculations you would be in a rather sticky position. Spark achieves resiliency not by data redundancy but by keeping track of how to reproduce the transformations leading to a state. In the face of a lost worker, Spark would try to re-read that portion of the data on the lost worker from Kafka. However, in the interim compaction may have moved past the reproducible checkpoint (T3) rendering the data inconsistent. At best the entire calculation would need to start over targeting some later transaction checkpoint. Needless to say with the proposed feature everything is quite simple. As long as we set the compaction lag large enough we can be assured that T1 will remain in the uncompacted region an thereby be reproducible. Thus reading from 0 to the offsets in T1 will be sufficient for the duration of the calculation. Eric
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
> On May 16, 2016, at 9:21 PM, Eric Wasserman wrote: > > Gwen, > > For simplicity, the example I gave in the gist is for a single table with a > single partition. The salient point is that even for a single topic with one > partition there is no guarantee without the feature that one will be able to > restore some particular checkpoint as the offset indicated by that checkpoint > may have been compacted away. > > The practical reality is we are trying to restore the state of a database > with nearly 1000 tables each of which has 8 partitions. In this real case > there are 8000 offsets indicated in each checkpoint. If even a single one of > those 8000 is compacted the checkpointed state cannot be reconstructed. > Eric, I believe you're talking about something like the following: http://imgur.com/3K4looF In the picture, that shows 3 topic-partitions. The blue lines show where transactions T1 T2 and T3 fall amongst the 3 topic-partitions. So if your consumers read the 3 topic-partitions up to any of the 3 blue lines, they will receive a consistent snapshot across all 3 topics. Now, look at the next image. http://imgur.com/5KuGWNv This one shows what happens after compaction. The blacked out boxes show items that were compacted out. The red lines show the compaction point. T1 can obviously not be constructed, because its offsets have been compacted out. At first glance, it looks like you would be able to read from the beginning up to T2 in order to construct a consistent snapshot, but that actually is not the case. The black box in the 2nd row has been compacted out, by let's say the 5th box in row 2. Reading up to T2 for the 2nd row means that you would be missing that value. And so the only consistent snapshot you can reconstruct would be T3. The general rule there is, you must read to the first transaction that occurs after the compaction points of across all 3 topic-partitions, in order to obtain a consistent snapshot. > Additionally, we don't really intend to have the consumers of the table > topics try to keep current. Rather they will occasionally (say at 1AM each > day) try to build the state of the database at a recent checkpoint (say from > midnight). Supposing this takes a bit of time (10's of minutes to hours) to > read all the partitions of all the table topics up each to its target offset > indicated in the midnight checkpoint. By the time all the consumers have > arrive at the designated offset perhaps one of them will have had its target > offset compacted away. We would then need to select a new target checkpoint > with its offsets for each topic and partition that is a bit later. How much > later? It might well be around the 10's of minutes to hours it took to read > through to the offsets of the original target checkpoint as the compaction > that foiled us may have occurred just before we reached the goal. > I'm not sure why you say you need an *additional* 10's of minutes to hours in order to reach the next target checkpoint. You have already read until the first checkpoint (which is no longer good enough). Wouldn't you just have to read an additional couple messages past that checkpoint in order to reach the next checkpoint? In my pictures, if you reached T2 and decided it was not good enough, you would simply have to read a couple more messages to get from T2 to T3. You would not have to start over from the beginning in order to read to T3. -James > Really the issue is that while without the feature while we could eventually > restore _some_ consistent state we couldn't be assured of being able to > restore any > particular (recent) one. My comment about never being assured of the process > terminating is just acknowledging the perhaps small but nonetheless finite > possibility of the process of chasing the checkpoints looking for which no > partition has yet had its target offset compacted away could continue > indefinitely. There is really no condition in which one could be absolutely > guaranteed this process would terminate. > > The feature addresses this by providing a guarantee that _any_ checkpoint can > be reconstructed as long as it is within the compaction lag. I would love to > be convinced that I am in error but short of that I frankly would never turn > on compaction for a CDC use case without it. > > As to reducing the number of parameters. I personally only see the > min.compaction.lag.ms as being truly essential. Even the existing ratio > setting is secondary in my mind. > > Eric > >> On May 16, 2016, at 6:42 PM, Gwen Shapira wrote: >> >> Hi Eric, >> >> Thank you for submitting this improvement suggestion. >> >> Do you mind clarifying the use-case for me? >> >> Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 >> >> If my consumer started reading all the CDC topics from the very >> beginning in which they were created, without ever stopping, it is >> obviously guarantee
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
We would find this KIP very useful. Our particular use case falls into the "application mistakes" portion of the KIP. We are storing source of truth data in log compacted topics, similar to the Confluent Schema Registry. One situation we had recently was a misbehaving application. It sent data into its log-compacted source-of-truth topic, with the same keys but with corrupted message bodies. It took several days for us to notice this and by that time, log compaction had occurred. We lost data. If we had been able to specify that log compaction would not occur for say 14 days, this would give us 2 weeks to notice and recover from this situation. We could not have prevented the application from appending bad data after the good data, but at least the good data would still be present in the not-yet-compacted portion of the log, and we could write some manual recovery process to extract the good data out and do with it whatever we needed. Because we do not have this facility, we currently have a rather complicated "backup" procedure whereby we use mirrormaker to mirror from our log compacted topic to a time-retention based topic. We have 14 day retention on the time-based topic, thus allowing us 14 days to recover from any data corruption on our log compacted topic. There is also a (imho) clever thing we do where we restart mirroring from the beginning of the log-compacted topic every 7 days. This allows us to make sure that even for very old non-changing keys, we always have a copy of them in the time-based topic. Gwen helped us come up with that backup plan many months ago (Thanks Gwen!) but having a configurable "do not compact the last X days" setting would likely be enough to satisfy our backup needs. -James > On May 16, 2016, at 9:20 PM, Jay Kreps wrote: > > Yeah I think I gave a scenario but that is not the same as a concrete use > case. I think the question you have is how common is it that people care > about this and what concrete things would you build where you had this > requirement? I think that would be good to figure out. > > I think the issue with the current state is that it really gives no SLA at > all, the last write to a segment is potentially compacted immediately so > even a few seconds of lag (if your segment size is small) would cause this. > > -Jay > > On Mon, May 16, 2016 at 9:05 PM, Gwen Shapira wrote: > >> I agree that log.cleaner.min.compaction.lag.ms gives slightly more >> flexibility for potentially-lagging consumers than tuning >> segment.roll.ms for the exact same scenario. >> >> If more people think that the use-case of "consumer which must see >> every single record, is running on a compacted topic, and is lagging >> enough that tuning segment.roll.ms won't help" is important enough >> that we need to address, I won't object to proceeding with the KIP >> (i.e. I'm probably -0 on this). It is easy to come up with a scenario >> in which a feature is helpful (heck, I do it all the time), I'm just >> not sure there is a real problem that cannot be addressed using >> Kafka's existing behavior. >> >> I do think that it will be an excellent idea to revisit the log >> compaction configurations and see whether they make sense to users. >> For example, if "log.cleaner.min.compaction.lag.X" can replace >> "log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative, >> I'll be more excited about the replacement, even without a strong >> use-case for a specific compaction lag. >> >> Gwen >> >> On Mon, May 16, 2016 at 7:46 PM, Jay Kreps wrote: >>> I think it would be good to hammer out some of the practical use cases--I >>> definitely share your disdain for adding more configs. Here is my sort of >>> theoretical understanding of why you might want this. >>> >>> As you say a consumer bootstrapping itself in the compacted part of the >> log >>> isn't actually traversing through valid states globally. i.e. if you have >>> written the following: >>> offset, key, value >>> 0, k0, v0 >>> 1, k1, v1 >>> 2, k0, v2 >>> it could be compacted to >>> 1, k1, v1 >>> 2, k0, v2 >>> Thus at offset 1 in the compacted log, you would have applied k1, but not >>> k0. So even though k0 and k1 both have valid values they get applied out >> of >>> order. This is totally normal, there is obviously no way to both compact >>> and retain every valid state. >>> >>> For many things this is a non-issue since they treat items only on a >>> per-key basis without any global notion of consistency. >>> >>> But let's say you want to guarantee you only traverse valid states in a >>> caught-up real-time consumer, how can you do this? It's actually a bit >>> tough. Generally speaking since we don't compact the active segment a >>> real-time consumer should have this property but this doesn't really >> give a >>> hard SLA. With a small segment size and a lagging consumer you could >>> imagine the compactor potentially getting ahead of the consumer. >>> >>> So effectively what
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
I see what you mean, Eric. I was unclear on the specifics of your architecture. It sounds like you have a table somewhere that maps checkpoints to lists of . In that case it is indeed useful to know that if the checkpoint was written N ms ago, you will be able to find the exact offsets by looking at the log. Reading ahead won't really help in that case, since it sounds like the state is too large to maintain in memory while reading ahead to a future checkpoint. (Different from Jay's abstract case in that regard). Gwen On Mon, May 16, 2016 at 9:21 PM, Eric Wasserman wrote: > Gwen, > > For simplicity, the example I gave in the gist is for a single table with a > single partition. The salient point is that even for a single topic with one > partition there is no guarantee without the feature that one will be able to > restore some particular checkpoint as the offset indicated by that checkpoint > may have been compacted away. > > The practical reality is we are trying to restore the state of a database > with nearly 1000 tables each of which has 8 partitions. In this real case > there are 8000 offsets indicated in each checkpoint. If even a single one of > those 8000 is compacted the checkpointed state cannot be reconstructed. > > Additionally, we don't really intend to have the consumers of the table > topics try to keep current. Rather they will occasionally (say at 1AM each > day) try to build the state of the database at a recent checkpoint (say from > midnight). Supposing this takes a bit of time (10's of minutes to hours) to > read all the partitions of all the table topics up each to its target offset > indicated in the midnight checkpoint. By the time all the consumers have > arrive at the designated offset perhaps one of them will have had its target > offset compacted away. We would then need to select a new target checkpoint > with its offsets for each topic and partition that is a bit later. How much > later? It might well be around the 10's of minutes to hours it took to read > through to the offsets of the original target checkpoint as the compaction > that foiled us may have occurred just before we reached the goal. > > Really the issue is that while without the feature while we could eventually > restore _some_ consistent state we couldn't be assured of being able to > restore any > particular (recent) one. My comment about never being assured of the process > terminating is just acknowledging the perhaps small but nonetheless finite > possibility of the process of chasing the checkpoints looking for which no > partition has yet had its target offset compacted away could continue > indefinitely. There is really no condition in which one could be absolutely > guaranteed this process would terminate. > > The feature addresses this by providing a guarantee that _any_ checkpoint can > be reconstructed as long as it is within the compaction lag. I would love to > be convinced that I am in error but short of that I frankly would never turn > on compaction for a CDC use case without it. > > As to reducing the number of parameters. I personally only see the > min.compaction.lag.ms as being truly essential. Even the existing ratio > setting is secondary in my mind. > > Eric > >> On May 16, 2016, at 6:42 PM, Gwen Shapira wrote: >> >> Hi Eric, >> >> Thank you for submitting this improvement suggestion. >> >> Do you mind clarifying the use-case for me? >> >> Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 >> >> If my consumer started reading all the CDC topics from the very >> beginning in which they were created, without ever stopping, it is >> obviously guaranteed to see every single consistent state of the >> database. >> If my consumer joined late (lets say after Tq got clobbered by Tr) it >> will get a mixed state, but if it will continue listening on those >> topics, always following the logs to their end, it is guaranteed to >> see a consistent state as soon a new transaction commits. Am I missing >> anything? >> >> Basically, I do not understand why you claim: "However, to recover all >> the tables at the same checkpoint, with each independently compacting, >> one may need to move to an even more recent checkpoint when a >> different table had the same read issue with the new checkpoint. Thus >> one could never be assured of this process terminating." >> >> I mean, it is true that you need to continuously read forward in order >> to get to a consistent state, but why can't you be assured of getting >> there? >> >> We are doing something very similar in KafkaConnect, where we need a >> consistent view of our configuration. We make sure that if the current >> state is inconsistent (i.e there is data that are not "committed" >> yet), we continue reading to the log end until we get to a consistent >> state. >> >> I am not convinced the new functionality is necessary, or even helpful. >> >> Gwen >> >> On Mon, May 16, 2016 at 4:07 PM, Er
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Gwen, For simplicity, the example I gave in the gist is for a single table with a single partition. The salient point is that even for a single topic with one partition there is no guarantee without the feature that one will be able to restore some particular checkpoint as the offset indicated by that checkpoint may have been compacted away. The practical reality is we are trying to restore the state of a database with nearly 1000 tables each of which has 8 partitions. In this real case there are 8000 offsets indicated in each checkpoint. If even a single one of those 8000 is compacted the checkpointed state cannot be reconstructed. Additionally, we don't really intend to have the consumers of the table topics try to keep current. Rather they will occasionally (say at 1AM each day) try to build the state of the database at a recent checkpoint (say from midnight). Supposing this takes a bit of time (10's of minutes to hours) to read all the partitions of all the table topics up each to its target offset indicated in the midnight checkpoint. By the time all the consumers have arrive at the designated offset perhaps one of them will have had its target offset compacted away. We would then need to select a new target checkpoint with its offsets for each topic and partition that is a bit later. How much later? It might well be around the 10's of minutes to hours it took to read through to the offsets of the original target checkpoint as the compaction that foiled us may have occurred just before we reached the goal. Really the issue is that while without the feature while we could eventually restore _some_ consistent state we couldn't be assured of being able to restore any particular (recent) one. My comment about never being assured of the process terminating is just acknowledging the perhaps small but nonetheless finite possibility of the process of chasing the checkpoints looking for which no partition has yet had its target offset compacted away could continue indefinitely. There is really no condition in which one could be absolutely guaranteed this process would terminate. The feature addresses this by providing a guarantee that _any_ checkpoint can be reconstructed as long as it is within the compaction lag. I would love to be convinced that I am in error but short of that I frankly would never turn on compaction for a CDC use case without it. As to reducing the number of parameters. I personally only see the min.compaction.lag.ms as being truly essential. Even the existing ratio setting is secondary in my mind. Eric > On May 16, 2016, at 6:42 PM, Gwen Shapira wrote: > > Hi Eric, > > Thank you for submitting this improvement suggestion. > > Do you mind clarifying the use-case for me? > > Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 > > If my consumer started reading all the CDC topics from the very > beginning in which they were created, without ever stopping, it is > obviously guaranteed to see every single consistent state of the > database. > If my consumer joined late (lets say after Tq got clobbered by Tr) it > will get a mixed state, but if it will continue listening on those > topics, always following the logs to their end, it is guaranteed to > see a consistent state as soon a new transaction commits. Am I missing > anything? > > Basically, I do not understand why you claim: "However, to recover all > the tables at the same checkpoint, with each independently compacting, > one may need to move to an even more recent checkpoint when a > different table had the same read issue with the new checkpoint. Thus > one could never be assured of this process terminating." > > I mean, it is true that you need to continuously read forward in order > to get to a consistent state, but why can't you be assured of getting > there? > > We are doing something very similar in KafkaConnect, where we need a > consistent view of our configuration. We make sure that if the current > state is inconsistent (i.e there is data that are not "committed" > yet), we continue reading to the log end until we get to a consistent > state. > > I am not convinced the new functionality is necessary, or even helpful. > > Gwen > > On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman > wrote: >> I would like to begin discussion on KIP-58 >> >> The KIP is here: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable >> >> Jira: https://issues.apache.org/jira/browse/KAFKA-1981 >> >> Pull Request: https://github.com/apache/kafka/pull/1168 >> >> Thanks, >> >> Eric
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Yeah I think I gave a scenario but that is not the same as a concrete use case. I think the question you have is how common is it that people care about this and what concrete things would you build where you had this requirement? I think that would be good to figure out. I think the issue with the current state is that it really gives no SLA at all, the last write to a segment is potentially compacted immediately so even a few seconds of lag (if your segment size is small) would cause this. -Jay On Mon, May 16, 2016 at 9:05 PM, Gwen Shapira wrote: > I agree that log.cleaner.min.compaction.lag.ms gives slightly more > flexibility for potentially-lagging consumers than tuning > segment.roll.ms for the exact same scenario. > > If more people think that the use-case of "consumer which must see > every single record, is running on a compacted topic, and is lagging > enough that tuning segment.roll.ms won't help" is important enough > that we need to address, I won't object to proceeding with the KIP > (i.e. I'm probably -0 on this). It is easy to come up with a scenario > in which a feature is helpful (heck, I do it all the time), I'm just > not sure there is a real problem that cannot be addressed using > Kafka's existing behavior. > > I do think that it will be an excellent idea to revisit the log > compaction configurations and see whether they make sense to users. > For example, if "log.cleaner.min.compaction.lag.X" can replace > "log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative, > I'll be more excited about the replacement, even without a strong > use-case for a specific compaction lag. > > Gwen > > On Mon, May 16, 2016 at 7:46 PM, Jay Kreps wrote: > > I think it would be good to hammer out some of the practical use cases--I > > definitely share your disdain for adding more configs. Here is my sort of > > theoretical understanding of why you might want this. > > > > As you say a consumer bootstrapping itself in the compacted part of the > log > > isn't actually traversing through valid states globally. i.e. if you have > > written the following: > > offset, key, value > > 0, k0, v0 > > 1, k1, v1 > > 2, k0, v2 > > it could be compacted to > > 1, k1, v1 > > 2, k0, v2 > > Thus at offset 1 in the compacted log, you would have applied k1, but not > > k0. So even though k0 and k1 both have valid values they get applied out > of > > order. This is totally normal, there is obviously no way to both compact > > and retain every valid state. > > > > For many things this is a non-issue since they treat items only on a > > per-key basis without any global notion of consistency. > > > > But let's say you want to guarantee you only traverse valid states in a > > caught-up real-time consumer, how can you do this? It's actually a bit > > tough. Generally speaking since we don't compact the active segment a > > real-time consumer should have this property but this doesn't really > give a > > hard SLA. With a small segment size and a lagging consumer you could > > imagine the compactor potentially getting ahead of the consumer. > > > > So effectively what this config would establish is a guarantee that as > long > > as you consume all messages in log.cleaner.min.compaction.lag.ms you > will > > get every single produced record. > > > > -Jay > > > > > > > > > > > > On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira wrote: > > > >> Hi Eric, > >> > >> Thank you for submitting this improvement suggestion. > >> > >> Do you mind clarifying the use-case for me? > >> > >> Looking at your gist: > >> https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 > >> > >> If my consumer started reading all the CDC topics from the very > >> beginning in which they were created, without ever stopping, it is > >> obviously guaranteed to see every single consistent state of the > >> database. > >> If my consumer joined late (lets say after Tq got clobbered by Tr) it > >> will get a mixed state, but if it will continue listening on those > >> topics, always following the logs to their end, it is guaranteed to > >> see a consistent state as soon a new transaction commits. Am I missing > >> anything? > >> > >> Basically, I do not understand why you claim: "However, to recover all > >> the tables at the same checkpoint, with each independently compacting, > >> one may need to move to an even more recent checkpoint when a > >> different table had the same read issue with the new checkpoint. Thus > >> one could never be assured of this process terminating." > >> > >> I mean, it is true that you need to continuously read forward in order > >> to get to a consistent state, but why can't you be assured of getting > >> there? > >> > >> We are doing something very similar in KafkaConnect, where we need a > >> consistent view of our configuration. We make sure that if the current > >> state is inconsistent (i.e there is data that are not "committed" > >> yet), we continue reading to the log end until we get to a consistent >
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
I agree that log.cleaner.min.compaction.lag.ms gives slightly more flexibility for potentially-lagging consumers than tuning segment.roll.ms for the exact same scenario. If more people think that the use-case of "consumer which must see every single record, is running on a compacted topic, and is lagging enough that tuning segment.roll.ms won't help" is important enough that we need to address, I won't object to proceeding with the KIP (i.e. I'm probably -0 on this). It is easy to come up with a scenario in which a feature is helpful (heck, I do it all the time), I'm just not sure there is a real problem that cannot be addressed using Kafka's existing behavior. I do think that it will be an excellent idea to revisit the log compaction configurations and see whether they make sense to users. For example, if "log.cleaner.min.compaction.lag.X" can replace "log.cleaner.min.cleanable.ratio" as an easier-to-tune alternative, I'll be more excited about the replacement, even without a strong use-case for a specific compaction lag. Gwen On Mon, May 16, 2016 at 7:46 PM, Jay Kreps wrote: > I think it would be good to hammer out some of the practical use cases--I > definitely share your disdain for adding more configs. Here is my sort of > theoretical understanding of why you might want this. > > As you say a consumer bootstrapping itself in the compacted part of the log > isn't actually traversing through valid states globally. i.e. if you have > written the following: > offset, key, value > 0, k0, v0 > 1, k1, v1 > 2, k0, v2 > it could be compacted to > 1, k1, v1 > 2, k0, v2 > Thus at offset 1 in the compacted log, you would have applied k1, but not > k0. So even though k0 and k1 both have valid values they get applied out of > order. This is totally normal, there is obviously no way to both compact > and retain every valid state. > > For many things this is a non-issue since they treat items only on a > per-key basis without any global notion of consistency. > > But let's say you want to guarantee you only traverse valid states in a > caught-up real-time consumer, how can you do this? It's actually a bit > tough. Generally speaking since we don't compact the active segment a > real-time consumer should have this property but this doesn't really give a > hard SLA. With a small segment size and a lagging consumer you could > imagine the compactor potentially getting ahead of the consumer. > > So effectively what this config would establish is a guarantee that as long > as you consume all messages in log.cleaner.min.compaction.lag.ms you will > get every single produced record. > > -Jay > > > > > > On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira wrote: > >> Hi Eric, >> >> Thank you for submitting this improvement suggestion. >> >> Do you mind clarifying the use-case for me? >> >> Looking at your gist: >> https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 >> >> If my consumer started reading all the CDC topics from the very >> beginning in which they were created, without ever stopping, it is >> obviously guaranteed to see every single consistent state of the >> database. >> If my consumer joined late (lets say after Tq got clobbered by Tr) it >> will get a mixed state, but if it will continue listening on those >> topics, always following the logs to their end, it is guaranteed to >> see a consistent state as soon a new transaction commits. Am I missing >> anything? >> >> Basically, I do not understand why you claim: "However, to recover all >> the tables at the same checkpoint, with each independently compacting, >> one may need to move to an even more recent checkpoint when a >> different table had the same read issue with the new checkpoint. Thus >> one could never be assured of this process terminating." >> >> I mean, it is true that you need to continuously read forward in order >> to get to a consistent state, but why can't you be assured of getting >> there? >> >> We are doing something very similar in KafkaConnect, where we need a >> consistent view of our configuration. We make sure that if the current >> state is inconsistent (i.e there is data that are not "committed" >> yet), we continue reading to the log end until we get to a consistent >> state. >> >> I am not convinced the new functionality is necessary, or even helpful. >> >> Gwen >> >> On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman >> wrote: >> > I would like to begin discussion on KIP-58 >> > >> > The KIP is here: >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable >> > >> > Jira: https://issues.apache.org/jira/browse/KAFKA-1981 >> > >> > Pull Request: https://github.com/apache/kafka/pull/1168 >> > >> > Thanks, >> > >> > Eric >>
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
I think it would be good to hammer out some of the practical use cases--I definitely share your disdain for adding more configs. Here is my sort of theoretical understanding of why you might want this. As you say a consumer bootstrapping itself in the compacted part of the log isn't actually traversing through valid states globally. i.e. if you have written the following: offset, key, value 0, k0, v0 1, k1, v1 2, k0, v2 it could be compacted to 1, k1, v1 2, k0, v2 Thus at offset 1 in the compacted log, you would have applied k1, but not k0. So even though k0 and k1 both have valid values they get applied out of order. This is totally normal, there is obviously no way to both compact and retain every valid state. For many things this is a non-issue since they treat items only on a per-key basis without any global notion of consistency. But let's say you want to guarantee you only traverse valid states in a caught-up real-time consumer, how can you do this? It's actually a bit tough. Generally speaking since we don't compact the active segment a real-time consumer should have this property but this doesn't really give a hard SLA. With a small segment size and a lagging consumer you could imagine the compactor potentially getting ahead of the consumer. So effectively what this config would establish is a guarantee that as long as you consume all messages in log.cleaner.min.compaction.lag.ms you will get every single produced record. -Jay On Mon, May 16, 2016 at 6:42 PM, Gwen Shapira wrote: > Hi Eric, > > Thank you for submitting this improvement suggestion. > > Do you mind clarifying the use-case for me? > > Looking at your gist: > https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 > > If my consumer started reading all the CDC topics from the very > beginning in which they were created, without ever stopping, it is > obviously guaranteed to see every single consistent state of the > database. > If my consumer joined late (lets say after Tq got clobbered by Tr) it > will get a mixed state, but if it will continue listening on those > topics, always following the logs to their end, it is guaranteed to > see a consistent state as soon a new transaction commits. Am I missing > anything? > > Basically, I do not understand why you claim: "However, to recover all > the tables at the same checkpoint, with each independently compacting, > one may need to move to an even more recent checkpoint when a > different table had the same read issue with the new checkpoint. Thus > one could never be assured of this process terminating." > > I mean, it is true that you need to continuously read forward in order > to get to a consistent state, but why can't you be assured of getting > there? > > We are doing something very similar in KafkaConnect, where we need a > consistent view of our configuration. We make sure that if the current > state is inconsistent (i.e there is data that are not "committed" > yet), we continue reading to the log end until we get to a consistent > state. > > I am not convinced the new functionality is necessary, or even helpful. > > Gwen > > On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman > wrote: > > I would like to begin discussion on KIP-58 > > > > The KIP is here: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable > > > > Jira: https://issues.apache.org/jira/browse/KAFKA-1981 > > > > Pull Request: https://github.com/apache/kafka/pull/1168 > > > > Thanks, > > > > Eric >
Re: [DISCUSS] KIP-58 - Make Log Compaction Point Configurable
Hi Eric, Thank you for submitting this improvement suggestion. Do you mind clarifying the use-case for me? Looking at your gist: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46 If my consumer started reading all the CDC topics from the very beginning in which they were created, without ever stopping, it is obviously guaranteed to see every single consistent state of the database. If my consumer joined late (lets say after Tq got clobbered by Tr) it will get a mixed state, but if it will continue listening on those topics, always following the logs to their end, it is guaranteed to see a consistent state as soon a new transaction commits. Am I missing anything? Basically, I do not understand why you claim: "However, to recover all the tables at the same checkpoint, with each independently compacting, one may need to move to an even more recent checkpoint when a different table had the same read issue with the new checkpoint. Thus one could never be assured of this process terminating." I mean, it is true that you need to continuously read forward in order to get to a consistent state, but why can't you be assured of getting there? We are doing something very similar in KafkaConnect, where we need a consistent view of our configuration. We make sure that if the current state is inconsistent (i.e there is data that are not "committed" yet), we continue reading to the log end until we get to a consistent state. I am not convinced the new functionality is necessary, or even helpful. Gwen On Mon, May 16, 2016 at 4:07 PM, Eric Wasserman wrote: > I would like to begin discussion on KIP-58 > > The KIP is here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-58+-+Make+Log+Compaction+Point+Configurable > > Jira: https://issues.apache.org/jira/browse/KAFKA-1981 > > Pull Request: https://github.com/apache/kafka/pull/1168 > > Thanks, > > Eric