[DISCUSS] KIP-31 - Message format change proposal
Hi, We just created KIP-31 to propose a message format change in Kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal As a summary, the motivations are: 1. Avoid server side message re-compression 2. Honor time-based log roll and retention 3. Enable offset search by timestamp at a finer granularity. Feedback and comments are welcome! Thanks, Jiangjie (Becket) Qin
Re: [DISCUSS] KIP-31 - Message format change proposal
Thanks for the write-up Jiangjie. One comment about migration plan: "For old consumers, if they see the new protocol the CRC check will fail".. Do you mean this bug in the old consumer cannot be fixed in a backward-compatible way? Guozhang On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin wrote: > Hi, > > We just created KIP-31 to propose a message format change in Kafka. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > As a summary, the motivations are: > 1. Avoid server side message re-compression > 2. Honor time-based log roll and retention > 3. Enable offset search by timestamp at a finer granularity. > > Feedback and comments are welcome! > > Thanks, > > Jiangjie (Becket) Qin > -- -- Guozhang
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi, Guozhang, Thanks for reading the KIP. By "old consumer", I meant the ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If we fix the ZookeeperConsumerConnector then it will throw exception complaining about the unsupported version when it sees message format V1. What I was trying to say is that if we have some ZookeeperConsumerConnector running without the fix, the consumer will complain about CRC mismatch instead of unsupported version. Thanks, Jiangjie (Becket) Qin On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang wrote: > Thanks for the write-up Jiangjie. > > One comment about migration plan: "For old consumers, if they see the new > protocol the CRC check will fail".. > > Do you mean this bug in the old consumer cannot be fixed in a > backward-compatible way? > > Guozhang > > > On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > wrote: > > > Hi, > > > > We just created KIP-31 to propose a message format change in Kafka. > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > > > As a summary, the motivations are: > > 1. Avoid server side message re-compression > > 2. Honor time-based log roll and retention > > 3. Enable offset search by timestamp at a finer granularity. > > > > Feedback and comments are welcome! > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Guozhang, I checked the code again. Actually CRC check probably won't fail. The newly added timestamp field might be treated as keyLength instead, so we are likely to receive an IllegalArgumentException when try to read the key. I'll update the KIP. Thanks, Jiangjie (Becket) Qin On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin wrote: > Hi, Guozhang, > > Thanks for reading the KIP. By "old consumer", I meant the > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If we > fix the ZookeeperConsumerConnector then it will throw exception complaining > about the unsupported version when it sees message format V1. What I was > trying to say is that if we have some ZookeeperConsumerConnector running > without the fix, the consumer will complain about CRC mismatch instead of > unsupported version. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang wrote: > >> Thanks for the write-up Jiangjie. >> >> One comment about migration plan: "For old consumers, if they see the new >> protocol the CRC check will fail".. >> >> Do you mean this bug in the old consumer cannot be fixed in a >> backward-compatible way? >> >> Guozhang >> >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin >> wrote: >> >> > Hi, >> > >> > We just created KIP-31 to propose a message format change in Kafka. >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal >> > >> > As a summary, the motivations are: >> > 1. Avoid server side message re-compression >> > 2. Honor time-based log roll and retention >> > 3. Enable offset search by timestamp at a finer granularity. >> > >> > Feedback and comments are welcome! >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> >> >> >> -- >> -- Guozhang >> > >
Re: [DISCUSS] KIP-31 - Message format change proposal
A few questions: 1. If we update the producers to only support V1, doesn't that mean people get stuck on the current version of the producer until they can be sure all their consumers have been upgraded? Is that going to be a problem for anyone, and does it potentially keep important fixes/enhancements (e.g. the upcoming client-side network request timeouts) because they have to wait for all the consumers to upgrade first? 2. Why minute granularity specifically? That's seems fairly specific to LI's example workload (which, by the way, is a very useful reference point to include in the KIP, thanks for that). If someone has the memory and can support it, why not let them go all the way to per-record timestamps (by making the interval configurable and they can set it really small/to 0)? It seems to me that some people might be willing to pay that cost for the application-side simplicity if they want to consume from specific timestamps. With the proposal as is, seeking to a timestamp still requires client-side logic to filter out messages that might have earlier timestamps due to the granularity chosen. The obvious tradeoff here is yet another config option -- I'm loath to add yet more configs, although this is one that we can choose a reasonable default for (like 1 minute) and people can easily change with no impact if they need to adjust it. 3. Why not respect the timestamp passed in as long as it's not some sentinel value that indicates the broker should fill it in? When you do know they will be ordered, as they should be with mirror maker (which is specifically mentioned), this seems really useful. (More about this in questions below...) 4. I think one obvious answer to (3) is that accepting client's timestamps could break seeking-by-timestamp since they may not be properly ordered. However, I think this can break anyway under normal operations -- any imperfections in clock synchronization could result in older timestamps being applied to new messages on a new leader broker compared messages stored previously by the old leader. I think it's probably too common to just think that NTP will take care of it and then run into weird bugs because NTP isn't always perfect, sometimes does some unexpected things, and, of course, does what you tell it to and so is subject to user error. It would be reasonable to argue that the leader change issue is much less likely of an issue than if you respect timestamps from producers, where, if applications actually filled it in, you'd receive a jumbled mess of timestamps and trying to do the binary search in the index wouldn't necessarily give you correct results. However, a) we could allow clients to fill that info in but discourage it where it might cause issues (i.e. normal applications) and it seems like a significant win for mirrormaker. I actually think not accepting timestamps is probably the better choice but a) it seems awkward in the protocol spec because we have to include the field in the produce requests since we don't want to have to fully decode them on the broker and b) losing that info during mirroring seems like it breaks the goal of fixing log retention (at least for mirrors) as well as the goal of improving searching by timestamp (at least for mirrors). 5. You never actually specified the granularity (or format, but I assume unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This definitely needs to eventually make it into the protocol docs. 6. Re: the rejected alternative. Are there any other options in changing the config format that might make it a bit lighter weight? For example, do we need a full int64? Could we do something relative instead that wouldn't require as many bytes? Without (5) being specified, it's actually difficult to evaluate some of these options. 7. Any plan to expose this in the client APIs? This is related to (4). If they are not exposed anywhere in the API as being associated with messages, then we can reasonably treat them as an internal implementation detail and be very clear about what looking up an offset for a timestamp means. If they are exposed, then they're more like message metadata that applications are probably going to want preserved, and thus will want the broker to respect the timestamp. For example, if I saw that consumers could get the timestamp, I'd want to be able to assign it at the producer so that even if I have significant batching I still get an accurate timestamp -- basically I might replace some cases where I use a timestamp embedded in the message with the timestamp provided by Kafka. (That said, we should consider the impact that including it in the protocol can have; non-Java clients are likely to expose it if it is available, whether it's actually a good idea to or not.) -Ewen On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin wrote: > Hi Guozhang, > > I checked the code again. Actually CRC check probably won't fail. The newly > added timestamp field might be treated as keyLe
Re: [DISCUSS] KIP-31 - Message format change proposal
Heh, I guess in addition to my wall of text of questions, I should also say that I think this provides useful functionality and fixes issues that we've seen a bunch of questions and complaints about, so I'm in favor of a fix and this looks like a pretty good approach :) It might also be useful to say which release you're hoping to get this into. -Ewen On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava wrote: > A few questions: > > 1. If we update the producers to only support V1, doesn't that mean people > get stuck on the current version of the producer until they can be sure all > their consumers have been upgraded? Is that going to be a problem for > anyone, and does it potentially keep important fixes/enhancements (e.g. the > upcoming client-side network request timeouts) because they have to wait > for all the consumers to upgrade first? > > 2. Why minute granularity specifically? That's seems fairly specific to > LI's example workload (which, by the way, is a very useful reference point > to include in the KIP, thanks for that). If someone has the memory and can > support it, why not let them go all the way to per-record timestamps (by > making the interval configurable and they can set it really small/to 0)? It > seems to me that some people might be willing to pay that cost for the > application-side simplicity if they want to consume from specific > timestamps. With the proposal as is, seeking to a timestamp still requires > client-side logic to filter out messages that might have earlier timestamps > due to the granularity chosen. The obvious tradeoff here is yet another > config option -- I'm loath to add yet more configs, although this is one > that we can choose a reasonable default for (like 1 minute) and people can > easily change with no impact if they need to adjust it. > > 3. Why not respect the timestamp passed in as long as it's not some > sentinel value that indicates the broker should fill it in? When you do > know they will be ordered, as they should be with mirror maker (which is > specifically mentioned), this seems really useful. (More about this in > questions below...) > > 4. I think one obvious answer to (3) is that accepting client's timestamps > could break seeking-by-timestamp since they may not be properly ordered. > However, I think this can break anyway under normal operations -- any > imperfections in clock synchronization could result in older timestamps > being applied to new messages on a new leader broker compared messages > stored previously by the old leader. I think it's probably too common to > just think that NTP will take care of it and then run into weird bugs > because NTP isn't always perfect, sometimes does some unexpected things, > and, of course, does what you tell it to and so is subject to user error. > > It would be reasonable to argue that the leader change issue is much less > likely of an issue than if you respect timestamps from producers, where, if > applications actually filled it in, you'd receive a jumbled mess of > timestamps and trying to do the binary search in the index wouldn't > necessarily give you correct results. However, a) we could allow clients to > fill that info in but discourage it where it might cause issues (i.e. > normal applications) and it seems like a significant win for mirrormaker. > > I actually think not accepting timestamps is probably the better choice > but a) it seems awkward in the protocol spec because we have to include the > field in the produce requests since we don't want to have to fully decode > them on the broker and b) losing that info during mirroring seems like it > breaks the goal of fixing log retention (at least for mirrors) as well as > the goal of improving searching by timestamp (at least for mirrors). > > 5. You never actually specified the granularity (or format, but I assume > unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This > definitely needs to eventually make it into the protocol docs. > > 6. Re: the rejected alternative. Are there any other options in changing > the config format that might make it a bit lighter weight? For example, do > we need a full int64? Could we do something relative instead that wouldn't > require as many bytes? Without (5) being specified, it's actually difficult > to evaluate some of these options. > > 7. Any plan to expose this in the client APIs? This is related to (4). If > they are not exposed anywhere in the API as being associated with messages, > then we can reasonably treat them as an internal implementation detail and > be very clear about what looking up an offset for a timestamp means. If > they are exposed, then they're more like message metadata that applications > are probably going to want preserved, and thus will want the broker to > respect the timestamp. For example, if I saw that consumers could get the > timestamp, I'd want to be able to assign it at the producer so that even if > I have significant batching I s
Re: [DISCUSS] KIP-31 - Message format change proposal
Hey Ewen, Thanks for the comments and they are really good questions. Please inline replies. On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava wrote: > A few questions: > > 1. If we update the producers to only support V1, doesn't that mean people > get stuck on the current version of the producer until they can be sure all > their consumers have been upgraded? Is that going to be a problem for > anyone, and does it potentially keep important fixes/enhancements (e.g. the > upcoming client-side network request timeouts) because they have to wait > for all the consumers to upgrade first? > This is a good point. I thought about this before, I and my initial thinking is that we might need to add a config on producer to specify which version you want to use to produce. But this seems to be a pretty ad-hoc approach and I don't really like it. We are working on some general protocol version control mechanism proposal and will have a separate KIP for that. > > 2. Why minute granularity specifically? That's seems fairly specific to > LI's example workload (which, by the way, is a very useful reference point > to include in the KIP, thanks for that). If someone has the memory and can > support it, why not let them go all the way to per-record timestamps (by > making the interval configurable and they can set it really small/to 0)? It > seems to me that some people might be willing to pay that cost for the > application-side simplicity if they want to consume from specific > timestamps. With the proposal as is, seeking to a timestamp still requires > client-side logic to filter out messages that might have earlier timestamps > due to the granularity chosen. The obvious tradeoff here is yet another > config option -- I'm loath to add yet more configs, although this is one > that we can choose a reasonable default for (like 1 minute) and people can > easily change with no impact if they need to adjust it. > The searching granularity will be actually millisecond. The index granularity only determines how close you will be to the actually message with the timestamp you are looking for. For example, if you are looking for a message with timestamp 10:00:15, a minute granularity will give you the offset at 10:00:00, and it needs to go through the records from 10:00:00 to 10:00:15 to find the message. But with a second level granularity, it might only need to go through the message produced in one second. So minute level granularity index will take longer for search, but the precision will be the same as second level index. That said, I am not objecting to adding the granularity configuration but I am not sure how useful it would be to have second level index because I think typically a consumer will be long-running and only search for the timestamp at startup. I will update the KIP page to clarify the precision. > 3. Why not respect the timestamp passed in as long as it's not some > sentinel value that indicates the broker should fill it in? When you do > know they will be ordered, as they should be with mirror maker (which is > specifically mentioned), this seems really useful. (More about this in > questions below...) > Like what you mentioned in (4), having a log without monotonically increasing timestamp is weird. To me it is even worse than having an empty timestamp field in the inner message that will not be used except for log compacted topic. I think the only way to solve this issue is to add another CreateTime to the message. So far I am not sure how useful it is though because arguably people can always put this timestamp in side the payload. So I think this timestamp is more for server side usage instead of application / client side usage. > > 4. I think one obvious answer to (3) is that accepting client's timestamps > could break seeking-by-timestamp since they may not be properly ordered. > However, I think this can break anyway under normal operations -- any > imperfections in clock synchronization could result in older timestamps > being applied to new messages on a new leader broker compared messages > stored previously by the old leader. I think it's probably too common to > just think that NTP will take care of it and then run into weird bugs > because NTP isn't always perfect, sometimes does some unexpected things, > and, of course, does what you tell it to and so is subject to user error. This is a good point. Yes, NTP only guarantee limited synchronization precision (several microseconds if you have low stratum and appropriate PPS). My experience is that it actually is good and stable enough even for some mission critical system such a core banking. Maybe this is more of an implementation detail. The simple solution is that when leader append messages to the log, it always take the max(lastAppendedTimestamp, currentTimeMillis). Arguably we can play the same trick even if we let the producer to fill in the timestamp. But that means the timestamp producer set may ore may not be honored, which
Re: [DISCUSS] KIP-31 - Message format change proposal
On Thu, Sep 3, 2015 at 11:45 PM, Jiangjie Qin wrote: > Hey Ewen, > > Thanks for the comments and they are really good questions. Please inline > replies. > > On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava > wrote: > > > A few questions: > > > > 1. If we update the producers to only support V1, doesn't that mean > people > > get stuck on the current version of the producer until they can be sure > all > > their consumers have been upgraded? Is that going to be a problem for > > anyone, and does it potentially keep important fixes/enhancements (e.g. > the > > upcoming client-side network request timeouts) because they have to wait > > for all the consumers to upgrade first? > > > This is a good point. I thought about this before, I and my initial > thinking is that we might need to add a config on producer to specify which > version you want to use to produce. But this seems to be a pretty ad-hoc > approach and I don't really like it. We are working on some general > protocol version control mechanism proposal and will have a separate KIP > for that. > The configs are annoying, but also can provide a way for us to guarantee a smooth upgrade for users that use the defaults and upgrade their entire infrastructure one version at a time: v0.8.3: no support for v1 v0.9.0: broker: accept.format=v0 (v1 is rejected) producer: produce.format=v0 consumer: include support for v0,v1 v0.9.1 broker: accept.format=v1 producer: produce.format=v1 consumer: v0,v1 > > > > 2. Why minute granularity specifically? That's seems fairly specific to > > LI's example workload (which, by the way, is a very useful reference > point > > to include in the KIP, thanks for that). If someone has the memory and > can > > support it, why not let them go all the way to per-record timestamps (by > > making the interval configurable and they can set it really small/to 0)? > It > > seems to me that some people might be willing to pay that cost for the > > application-side simplicity if they want to consume from specific > > timestamps. With the proposal as is, seeking to a timestamp still > requires > > client-side logic to filter out messages that might have earlier > timestamps > > due to the granularity chosen. The obvious tradeoff here is yet another > > config option -- I'm loath to add yet more configs, although this is one > > that we can choose a reasonable default for (like 1 minute) and people > can > > easily change with no impact if they need to adjust it. > > > The searching granularity will be actually millisecond. The index > granularity only determines how close you will be to the actually message > with the timestamp you are looking for. For example, if you are looking for > a message with timestamp 10:00:15, a minute granularity will give you the > offset at 10:00:00, and it needs to go through the records from 10:00:00 to > 10:00:15 to find the message. But with a second level granularity, it might > only need to go through the message produced in one second. So minute level > granularity index will take longer for search, but the precision will be > the same as second level index. That said, I am not objecting to adding the > granularity configuration but I am not sure how useful it would be to have > second level index because I think typically a consumer will be > long-running and only search for the timestamp at startup. > I will update the KIP page to clarify the precision. > > Ok, this makes sense. > > > > 3. Why not respect the timestamp passed in as long as it's not some > > sentinel value that indicates the broker should fill it in? When you do > > know they will be ordered, as they should be with mirror maker (which is > > specifically mentioned), this seems really useful. (More about this in > > questions below...) > > > Like what you mentioned in (4), having a log without monotonically > increasing timestamp is weird. To me it is even worse than having an empty > timestamp field in the inner message that will not be used except for log > compacted topic. I think the only way to solve this issue is to add another > CreateTime to the message. So far I am not sure how useful it is though > because arguably people can always put this timestamp in side the payload. > So I think this timestamp is more for server side usage instead of > application / client side usage. > I think setting it broker-side is fine, I just want to make sure user expectations are clear. The existing search-by-timestamp has obvious limitations. This proposal makes it much more accurate, so it needs to be clear who is responsible for assigning the timestamps and what they mean for an application. Applications that care about when the message was actually created will still need to do some additional work. > > > > > 4. I think one obvious answer to (3) is that accepting client's > timestamps > > could break seeking-by-timestamp since they may not be properly ordered. > > However, I think this can break anyway under normal operat
Re: [DISCUSS] KIP-31 - Message format change proposal
The magic byte is used to version message format so we'll need to make sure that check is in place--I actually don't see it in the current consumer code which I think is a bug we should fix for the next release (filed KAFKA-2523). The purpose of that field is so there is a clear check on the format rather than the scrambled scenarios Becket describes. Also, Becket, I don't think just fixing the java client is sufficient as that would break other clients--i.e. if anyone writes a v1 messages, even by accident, any non-v1-capable consumer will break. I think we probably need a way to have the server ensure a particular message format either at read or write time. -Jay On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin wrote: > Hi Guozhang, > > I checked the code again. Actually CRC check probably won't fail. The newly > added timestamp field might be treated as keyLength instead, so we are > likely to receive an IllegalArgumentException when try to read the key. > I'll update the KIP. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin wrote: > > > Hi, Guozhang, > > > > Thanks for reading the KIP. By "old consumer", I meant the > > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If > we > > fix the ZookeeperConsumerConnector then it will throw exception > complaining > > about the unsupported version when it sees message format V1. What I was > > trying to say is that if we have some ZookeeperConsumerConnector running > > without the fix, the consumer will complain about CRC mismatch instead of > > unsupported version. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang > wrote: > > > >> Thanks for the write-up Jiangjie. > >> > >> One comment about migration plan: "For old consumers, if they see the > new > >> protocol the CRC check will fail".. > >> > >> Do you mean this bug in the old consumer cannot be fixed in a > >> backward-compatible way? > >> > >> Guozhang > >> > >> > >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > > >> wrote: > >> > >> > Hi, > >> > > >> > We just created KIP-31 to propose a message format change in Kafka. > >> > > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > >> > > >> > As a summary, the motivations are: > >> > 1. Avoid server side message re-compression > >> > 2. Honor time-based log roll and retention > >> > 3. Enable offset search by timestamp at a finer granularity. > >> > > >> > Feedback and comments are welcome! > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > >
Re: [DISCUSS] KIP-31 - Message format change proposal
One more comment, I think there are really three proposals here: 1. Get a mechanism and policy in place for record format upgrade (we haven't done this so we don't really have the infra). This is kind of implicit. I suspect we'll need to do this multiple times in the future so we should make it easy. 2. Add a timestamp to messages. 3. Move to relative offsets For sanity it might make sense to discuss these individually. I think the relative offset proposal is pretty straight-forward. It probably should have been done that way to begin with. I think you should get near-universal support on that one. Saving the re-compression on the server is a big win. I really wish we'd thought of that at the time. The timestamp problems we have are definitely annoying, and I agree that time is really a first class thign. But adding time has a ton of problems that need to be fully worked out before we pull the trigger. First, I like the implementation plan you have for the time index--I think you are saying that would retain the same format as the existing OffsetIndex, although it would require some refactoring. You are correct that this should be a separate index file--this will allow the index to be less frequent (smaller) and also let it page out if it isn't used. Now the bad bits about time! 1. Clock time isn't sequential. The whole point of NTP is to sync the clock. That means changing the time forwards and backward I think. Also users can change the time any time they want! Also when the master fails it moves to a different machine, maybe it's clock is sync'd, maybe it's not. If I mirror-maker two partitions into one then surely there is skew, possibly hours or days of skew (i.e. imagine cross-dc mirror maker where the network isn't available for a bit of time and then catches up). (Also not sure how having the leader do max(old_leader_time, current_time) works if we accept client times in the mm case?) 2. Nobody cares what time it is on the server. Consider cases where data is being copied from a database or from log files. In steady-state the server time is very close to the client time if their clocks are sync'd (see 1) but there will be times of large divergence when the copying process is stopped or falls behind. When this occurs it is clear that the time the data arrived on the server is irrelevant, it is the source timestamp that matters. This is the problem you are trying to fix by retaining the mm timestamp but really the client should always set the time with the use of server-side time as a fallback. It would be worth talking to the Samza folks and reading through this blog post ( http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html) on this subject since we went through similar learnings on the stream processing side. I think the implication of these two is that we need a proposal that handles potentially very out-of-order timestamps in some kind of sanish way (buggy clients will set something totally wrong as the time). -Jay On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps wrote: > The magic byte is used to version message format so we'll need to make > sure that check is in place--I actually don't see it in the current > consumer code which I think is a bug we should fix for the next release > (filed KAFKA-2523). The purpose of that field is so there is a clear check > on the format rather than the scrambled scenarios Becket describes. > > Also, Becket, I don't think just fixing the java client is sufficient as > that would break other clients--i.e. if anyone writes a v1 messages, even > by accident, any non-v1-capable consumer will break. I think we probably > need a way to have the server ensure a particular message format either at > read or write time. > > -Jay > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > wrote: > >> Hi Guozhang, >> >> I checked the code again. Actually CRC check probably won't fail. The >> newly >> added timestamp field might be treated as keyLength instead, so we are >> likely to receive an IllegalArgumentException when try to read the key. >> I'll update the KIP. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin wrote: >> >> > Hi, Guozhang, >> > >> > Thanks for reading the KIP. By "old consumer", I meant the >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. >> If we >> > fix the ZookeeperConsumerConnector then it will throw exception >> complaining >> > about the unsupported version when it sees message format V1. What I was >> > trying to say is that if we have some ZookeeperConsumerConnector running >> > without the fix, the consumer will complain about CRC mismatch instead >> of >> > unsupported version. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang >> wrote: >> > >> >> Thanks for the write-up Jiangjie. >> >> >> >> One comment about migration plan: "For old consumers, if they see the >> new >> >> protocol the CRC
Re: [DISCUSS] KIP-31 - Message format change proposal
On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps wrote: > > 2. Nobody cares what time it is on the server. > This is a good way of summarizing the issue I was trying to get at, from an app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is reasonably handled by a server-side timestamp. I really just care that a message is there long enough that I have a chance to process it. #3 (searching by timestamp) only seems useful if we can guarantee the server-side timestamp is close enough to the original client-side timestamp, and any mirror maker step seems to break that (even ignoring any issues with broker availability). I'm also wondering whether optimizing for search-by-timestamp on the broker is really something we want to do given that messages aren't really guaranteed to be ordered by application-level timestamps on the broker. Is part of the need for this just due to the current consumer APIs being difficult to work with? For example, could you implement this pretty easily client side just the way you would broker-side? I'd imagine a couple of random seeks + reads during very rare occasions (i.e. when the app starts up) wouldn't be a problem performance-wise. Or is it also that you need the broker to enforce things like monotonically increasing timestamps since you can't do the query properly and efficiently without that guarantee, and therefore what applications are actually looking for *is* broker-side timestamps? -Ewen > Consider cases where data is being copied from a database or from log > files. In steady-state the server time is very close to the client time if > their clocks are sync'd (see 1) but there will be times of large divergence > when the copying process is stopped or falls behind. When this occurs it is > clear that the time the data arrived on the server is irrelevant, it is the > source timestamp that matters. This is the problem you are trying to fix by > retaining the mm timestamp but really the client should always set the time > with the use of server-side time as a fallback. It would be worth talking > to the Samza folks and reading through this blog post ( > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > ) > on this subject since we went through similar learnings on the stream > processing side. > > I think the implication of these two is that we need a proposal that > handles potentially very out-of-order timestamps in some kind of sanish way > (buggy clients will set something totally wrong as the time). > > -Jay > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps wrote: > > > The magic byte is used to version message format so we'll need to make > > sure that check is in place--I actually don't see it in the current > > consumer code which I think is a bug we should fix for the next release > > (filed KAFKA-2523). The purpose of that field is so there is a clear > check > > on the format rather than the scrambled scenarios Becket describes. > > > > Also, Becket, I don't think just fixing the java client is sufficient as > > that would break other clients--i.e. if anyone writes a v1 messages, even > > by accident, any non-v1-capable consumer will break. I think we probably > > need a way to have the server ensure a particular message format either > at > > read or write time. > > > > -Jay > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > > wrote: > > > >> Hi Guozhang, > >> > >> I checked the code again. Actually CRC check probably won't fail. The > >> newly > >> added timestamp field might be treated as keyLength instead, so we are > >> likely to receive an IllegalArgumentException when try to read the key. > >> I'll update the KIP. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin > wrote: > >> > >> > Hi, Guozhang, > >> > > >> > Thanks for reading the KIP. By "old consumer", I meant the > >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. > >> If we > >> > fix the ZookeeperConsumerConnector then it will throw exception > >> complaining > >> > about the unsupported version when it sees message format V1. What I > was > >> > trying to say is that if we have some ZookeeperConsumerConnector > running > >> > without the fix, the consumer will complain about CRC mismatch instead > >> of > >> > unsupported version. > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang > >> wrote: > >> > > >> >> Thanks for the write-up Jiangjie. > >> >> > >> >> One comment about migration plan: "For old consumers, if they see the > >> new > >> >> protocol the CRC check will fail".. > >> >> > >> >> Do you mean this bug in the old consumer cannot be fixed in a > >> >> backward-compatible way? > >> >> > >> >> Guozhang > >> >> > >> >> > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > >> > > >> >> wrote: > >> >> > >> >> > Hi, > >> >> > > >> >> > We just created KIP-31 to propose a message format change in
Re: [DISCUSS] KIP-31 - Message format change proposal
So, with regards to why you want to search by timestamp, the biggest problem I've seen is with consumers who want to reset their timestamps to a specific point, whether it is to replay a certain amount of messages, or to rewind to before some problem state existed. This happens more often than anyone would like. To handle this now we need to constantly export the broker's offset for every partition to a time-series database and then use external processes to query this. I know we're not the only ones doing this. The way the broker handles requests for offsets by timestamp is a little obtuse (explain it to anyone without intimate knowledge of the internal workings of the broker - every time I do I see this). In addition, as Becket pointed out, it causes problems specifically with retention of messages by time when you move partitions around. I'm deliberately avoiding the discussion of what timestamp to use. I can see the argument either way, though I tend to lean towards the idea that the broker timestamp is the only viable source of truth in this situation. -Todd On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava wrote: > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps wrote: > > > > > 2. Nobody cares what time it is on the server. > > > > This is a good way of summarizing the issue I was trying to get at, from an > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is > reasonably handled by a server-side timestamp. I really just care that a > message is there long enough that I have a chance to process it. #3 > (searching by timestamp) only seems useful if we can guarantee the > server-side timestamp is close enough to the original client-side > timestamp, and any mirror maker step seems to break that (even ignoring any > issues with broker availability). > > I'm also wondering whether optimizing for search-by-timestamp on the broker > is really something we want to do given that messages aren't really > guaranteed to be ordered by application-level timestamps on the broker. Is > part of the need for this just due to the current consumer APIs being > difficult to work with? For example, could you implement this pretty easily > client side just the way you would broker-side? I'd imagine a couple of > random seeks + reads during very rare occasions (i.e. when the app starts > up) wouldn't be a problem performance-wise. Or is it also that you need the > broker to enforce things like monotonically increasing timestamps since you > can't do the query properly and efficiently without that guarantee, and > therefore what applications are actually looking for *is* broker-side > timestamps? > > -Ewen > > > > > Consider cases where data is being copied from a database or from log > > files. In steady-state the server time is very close to the client time > if > > their clocks are sync'd (see 1) but there will be times of large > divergence > > when the copying process is stopped or falls behind. When this occurs it > is > > clear that the time the data arrived on the server is irrelevant, it is > the > > source timestamp that matters. This is the problem you are trying to fix > by > > retaining the mm timestamp but really the client should always set the > time > > with the use of server-side time as a fallback. It would be worth talking > > to the Samza folks and reading through this blog post ( > > > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > > ) > > on this subject since we went through similar learnings on the stream > > processing side. > > > > I think the implication of these two is that we need a proposal that > > handles potentially very out-of-order timestamps in some kind of sanish > way > > (buggy clients will set something totally wrong as the time). > > > > -Jay > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps wrote: > > > > > The magic byte is used to version message format so we'll need to make > > > sure that check is in place--I actually don't see it in the current > > > consumer code which I think is a bug we should fix for the next release > > > (filed KAFKA-2523). The purpose of that field is so there is a clear > > check > > > on the format rather than the scrambled scenarios Becket describes. > > > > > > Also, Becket, I don't think just fixing the java client is sufficient > as > > > that would break other clients--i.e. if anyone writes a v1 messages, > even > > > by accident, any non-v1-capable consumer will break. I think we > probably > > > need a way to have the server ensure a particular message format either > > at > > > read or write time. > > > > > > -Jay > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > > > > wrote: > > > > > >> Hi Guozhang, > > >> > > >> I checked the code again. Actually CRC check probably won't fail. The > > >> newly > > >> added timestamp field might be treated as keyLength instead, so we are > > >> likely to receive an IllegalArgumentException when try to read the > key. > > >> I'll update t
Re: [DISCUSS] KIP-31 - Message format change proposal
Jay, Thanks for the comments. Yes, there are actually three proposals as you pointed out. We will have a separate proposal for (1) - version control mechanism. We actually thought about whether we want to separate 2 and 3 internally before creating the KIP. The reason we put 2 and 3 together is it will saves us another cross board wire protocol change. Like you said, we have to migrate all the clients in all languages. To some extent, the effort to spend on upgrading the clients can be even bigger than implementing the new feature itself. So there are some attractions if we can do 2 and 3 together instead of separately. Maybe after (1) is done it will be easier to do protocol migration. But if we are able to come to an agreement on the timestamp solution, I would prefer to have it together with relative offset in the interest of avoiding another wire protocol change (the process to migrate to relative offset is exactly the same as migrate to message with timestamp). In terms of timestamp. I completely agree that having client timestamp is more useful if we can make sure the timestamp is good. But in reality that can be a really big *IF*. I think the problem is exactly as Ewen mentioned, if we let the client to set the timestamp, it would be very hard for the broker to utilize it. If broker apply retention policy based on the client timestamp. One misbehave producer can potentially completely mess up the retention policy on the broker. Although people don't care about server side timestamp. People do care a lot when timestamp breaks. Searching by timestamp is a really important use case even though it is not used as often as searching by offset. It has significant direct impact on RTO when there is a cross cluster failover as Todd mentioned. The trick using max(lastAppendedTimestamp, currentTimeMillis) is to guarantee monotonic increase of the timestamp. Many commercial system actually do something similar to this to solve the time skew. About changing the time, I am not sure if people use NTP like using a watch to just set it forward/backward by an hour or so. The time adjustment I used to do is typically to adjust something like a minute / week. So for each second, there might be a few mircoseconds slower/faster but should not break the clock completely to make sure all the time-based transactions are not affected. The one minute change will be done within a week but not instantly. Personally, I think having client side timestamp will be useful if we don't need to put the broker and data integrity under risk. If we have to choose from one of them but not both. I would prefer server side timestamp because for client side timestamp there is always a plan B which is putting the timestamp into payload. Another reason I am reluctant to use the client side timestamp is that it is always dangerous to mix the control plane with data plane. IP did this and it has caused so many different breaches so people are migrating to something like MPLS. An example in Kafka is that any client can construct a LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest (you name it) and send it to the broker to mess up the entire cluster, also as we already noticed a busy cluster can respond quite slow to controller messages. So it would really be nice if we can avoid giving the power to clients to control the log retention. Thanks, Jiangjie (Becket) Qin On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino wrote: > So, with regards to why you want to search by timestamp, the biggest > problem I've seen is with consumers who want to reset their timestamps to a > specific point, whether it is to replay a certain amount of messages, or to > rewind to before some problem state existed. This happens more often than > anyone would like. > > To handle this now we need to constantly export the broker's offset for > every partition to a time-series database and then use external processes > to query this. I know we're not the only ones doing this. The way the > broker handles requests for offsets by timestamp is a little obtuse > (explain it to anyone without intimate knowledge of the internal workings > of the broker - every time I do I see this). In addition, as Becket pointed > out, it causes problems specifically with retention of messages by time > when you move partitions around. > > I'm deliberately avoiding the discussion of what timestamp to use. I can > see the argument either way, though I tend to lean towards the idea that > the broker timestamp is the only viable source of truth in this situation. > > -Todd > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava > wrote: > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps wrote: > > > > > > > > 2. Nobody cares what time it is on the server. > > > > > > > This is a good way of summarizing the issue I was trying to get at, from > an > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) > is > > reasonably handled by a server-side timestamp
Re: [DISCUSS] KIP-31 - Message format change proposal
Hey Todd, Yeah, totally agree the use case is important. I think there are potentially also other uses that having access by time opens up too. -Jay On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino wrote: > So, with regards to why you want to search by timestamp, the biggest > problem I've seen is with consumers who want to reset their timestamps to a > specific point, whether it is to replay a certain amount of messages, or to > rewind to before some problem state existed. This happens more often than > anyone would like. > > To handle this now we need to constantly export the broker's offset for > every partition to a time-series database and then use external processes > to query this. I know we're not the only ones doing this. The way the > broker handles requests for offsets by timestamp is a little obtuse > (explain it to anyone without intimate knowledge of the internal workings > of the broker - every time I do I see this). In addition, as Becket pointed > out, it causes problems specifically with retention of messages by time > when you move partitions around. > > I'm deliberately avoiding the discussion of what timestamp to use. I can > see the argument either way, though I tend to lean towards the idea that > the broker timestamp is the only viable source of truth in this situation. > > -Todd > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava > wrote: > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps wrote: > > > > > > > > 2. Nobody cares what time it is on the server. > > > > > > > This is a good way of summarizing the issue I was trying to get at, from > an > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) > is > > reasonably handled by a server-side timestamp. I really just care that a > > message is there long enough that I have a chance to process it. #3 > > (searching by timestamp) only seems useful if we can guarantee the > > server-side timestamp is close enough to the original client-side > > timestamp, and any mirror maker step seems to break that (even ignoring > any > > issues with broker availability). > > > > I'm also wondering whether optimizing for search-by-timestamp on the > broker > > is really something we want to do given that messages aren't really > > guaranteed to be ordered by application-level timestamps on the broker. > Is > > part of the need for this just due to the current consumer APIs being > > difficult to work with? For example, could you implement this pretty > easily > > client side just the way you would broker-side? I'd imagine a couple of > > random seeks + reads during very rare occasions (i.e. when the app starts > > up) wouldn't be a problem performance-wise. Or is it also that you need > the > > broker to enforce things like monotonically increasing timestamps since > you > > can't do the query properly and efficiently without that guarantee, and > > therefore what applications are actually looking for *is* broker-side > > timestamps? > > > > -Ewen > > > > > > > > > Consider cases where data is being copied from a database or from log > > > files. In steady-state the server time is very close to the client time > > if > > > their clocks are sync'd (see 1) but there will be times of large > > divergence > > > when the copying process is stopped or falls behind. When this occurs > it > > is > > > clear that the time the data arrived on the server is irrelevant, it is > > the > > > source timestamp that matters. This is the problem you are trying to > fix > > by > > > retaining the mm timestamp but really the client should always set the > > time > > > with the use of server-side time as a fallback. It would be worth > talking > > > to the Samza folks and reading through this blog post ( > > > > > > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > > > ) > > > on this subject since we went through similar learnings on the stream > > > processing side. > > > > > > I think the implication of these two is that we need a proposal that > > > handles potentially very out-of-order timestamps in some kind of sanish > > way > > > (buggy clients will set something totally wrong as the time). > > > > > > -Jay > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps wrote: > > > > > > > The magic byte is used to version message format so we'll need to > make > > > > sure that check is in place--I actually don't see it in the current > > > > consumer code which I think is a bug we should fix for the next > release > > > > (filed KAFKA-2523). The purpose of that field is so there is a clear > > > check > > > > on the format rather than the scrambled scenarios Becket describes. > > > > > > > > Also, Becket, I don't think just fixing the java client is sufficient > > as > > > > that would break other clients--i.e. if anyone writes a v1 messages, > > even > > > > by accident, any non-v1-capable consumer will break. I think we > > probably > > > > need a way to have the server ensure a particular message format > e
Re: [DISCUSS] KIP-31 - Message format change proposal
Hey Beckett, I was proposing splitting up the KIP just for simplicity of discussion. You can still implement them in one patch. I think otherwise it will be hard to discuss/vote on them since if you like the offset proposal but not the time proposal what do you do? Introducing a second notion of time into Kafka is a pretty massive philosophical change so it kind of warrants it's own KIP I think it isn't just "Change message format". WRT time I think one thing to clarify in the proposal is how MM will have access to set the timestamp? Presumably this will be a new field in ProducerRecord, right? If so then any user can set the timestamp, right? I'm not sure you answered the questions around how this will work for MM since when MM retains timestamps from multiple partitions they will then be out of order and in the past (so the max(lastAppendedTimestamp, currentTimeMillis) override you proposed will not work, right?). If we don't do this then when you set up mirroring the data will all be new and you have the same retention problem you described. Maybe I missed something...? My main motivation is that given that both Samza and Kafka streams are doing work that implies a mandatory client-defined notion of time, I really think introducing a different mandatory notion of time in Kafka is going to be quite odd. We should think hard about how client-defined time could work. I'm not sure if it can, but I'm also not sure that it can't. Having both will be odd. Did you chat about this with Yi/Kartik on the Samza side? When you are saying it won't work you are assuming some particular implementation? Maybe that the index is a monotonically increasing set of pointers to the least record with a timestamp larger than the index time? In other words a search for time X gives the largest offset at which all records are <= X? For retention, I agree with the problem you point out, but I think what you are saying in that case is that you want a size limit too. If you use system time you actually hit the same problem: say you do a full dump of a DB table with a setting of 7 days retention, your retention will actually not get enforced for the first 7 days because the data is "new to Kafka". -Jay On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin wrote: > Jay, > > Thanks for the comments. Yes, there are actually three proposals as you > pointed out. > > We will have a separate proposal for (1) - version control mechanism. We > actually thought about whether we want to separate 2 and 3 internally > before creating the KIP. The reason we put 2 and 3 together is it will > saves us another cross board wire protocol change. Like you said, we have > to migrate all the clients in all languages. To some extent, the effort to > spend on upgrading the clients can be even bigger than implementing the new > feature itself. So there are some attractions if we can do 2 and 3 together > instead of separately. Maybe after (1) is done it will be easier to do > protocol migration. But if we are able to come to an agreement on the > timestamp solution, I would prefer to have it together with relative offset > in the interest of avoiding another wire protocol change (the process to > migrate to relative offset is exactly the same as migrate to message with > timestamp). > > In terms of timestamp. I completely agree that having client timestamp is > more useful if we can make sure the timestamp is good. But in reality that > can be a really big *IF*. I think the problem is exactly as Ewen mentioned, > if we let the client to set the timestamp, it would be very hard for the > broker to utilize it. If broker apply retention policy based on the client > timestamp. One misbehave producer can potentially completely mess up the > retention policy on the broker. Although people don't care about server > side timestamp. People do care a lot when timestamp breaks. Searching by > timestamp is a really important use case even though it is not used as > often as searching by offset. It has significant direct impact on RTO when > there is a cross cluster failover as Todd mentioned. > > The trick using max(lastAppendedTimestamp, currentTimeMillis) is to > guarantee monotonic increase of the timestamp. Many commercial system > actually do something similar to this to solve the time skew. About > changing the time, I am not sure if people use NTP like using a watch to > just set it forward/backward by an hour or so. The time adjustment I used > to do is typically to adjust something like a minute / week. So for each > second, there might be a few mircoseconds slower/faster but should not > break the clock completely to make sure all the time-based transactions are > not affected. The one minute change will be done within a week but not > instantly. > > Personally, I think having client side timestamp will be useful if we don't > need to put the broker and data integrity under risk. If we have to choose > from one of them but not both. I would prefer server
Re: [DISCUSS] KIP-31 - Message format change proposal
Becket, Nice write-up. Few thoughts - I'd split up the discussion for simplicity. Note that you can always group several of these in one patch to reduce the protocol changes people have to deal with.This is just a suggestion, but I think the following split might make it easier to tackle the changes being proposed - - Relative offsets - Introducing the concept of time - Time-based indexing (separate the usage of the timestamp field from how/whether we want to include a timestamp in the message) I'm a +1 on relative offsets, we should've done it back when we introduced it. Other than reducing the CPU overhead, this will also reduce the garbage collection overhead on the brokers. On the timestamp field, I generally agree that we should add a timestamp to a Kafka message but I'm not quite sold on how this KIP suggests the timestamp be set. Will avoid repeating the downsides of a broker side timestamp mentioned previously in this thread. I think the topic of including a timestamp in a Kafka message requires a lot more thought and details than what's in this KIP. I'd suggest we make it a separate KIP that includes a list of all the different use cases for the timestamp (beyond log retention) including stream processing and discuss tradeoffs of including client and broker side timestamps. Agree with the benefit of time-based indexing, but haven't had a chance to dive into the design details yet. Thanks, Neha On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps wrote: > Hey Beckett, > > I was proposing splitting up the KIP just for simplicity of discussion. You > can still implement them in one patch. I think otherwise it will be hard to > discuss/vote on them since if you like the offset proposal but not the time > proposal what do you do? > > Introducing a second notion of time into Kafka is a pretty massive > philosophical change so it kind of warrants it's own KIP I think it isn't > just "Change message format". > > WRT time I think one thing to clarify in the proposal is how MM will have > access to set the timestamp? Presumably this will be a new field in > ProducerRecord, right? If so then any user can set the timestamp, right? > I'm not sure you answered the questions around how this will work for MM > since when MM retains timestamps from multiple partitions they will then be > out of order and in the past (so the max(lastAppendedTimestamp, > currentTimeMillis) override you proposed will not work, right?). If we > don't do this then when you set up mirroring the data will all be new and > you have the same retention problem you described. Maybe I missed > something...? > > My main motivation is that given that both Samza and Kafka streams are > doing work that implies a mandatory client-defined notion of time, I really > think introducing a different mandatory notion of time in Kafka is going to > be quite odd. We should think hard about how client-defined time could > work. I'm not sure if it can, but I'm also not sure that it can't. Having > both will be odd. Did you chat about this with Yi/Kartik on the Samza side? > > When you are saying it won't work you are assuming some particular > implementation? Maybe that the index is a monotonically increasing set of > pointers to the least record with a timestamp larger than the index time? > In other words a search for time X gives the largest offset at which all > records are <= X? > > For retention, I agree with the problem you point out, but I think what you > are saying in that case is that you want a size limit too. If you use > system time you actually hit the same problem: say you do a full dump of a > DB table with a setting of 7 days retention, your retention will actually > not get enforced for the first 7 days because the data is "new to Kafka". > > -Jay > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin > wrote: > > > Jay, > > > > Thanks for the comments. Yes, there are actually three proposals as you > > pointed out. > > > > We will have a separate proposal for (1) - version control mechanism. We > > actually thought about whether we want to separate 2 and 3 internally > > before creating the KIP. The reason we put 2 and 3 together is it will > > saves us another cross board wire protocol change. Like you said, we have > > to migrate all the clients in all languages. To some extent, the effort > to > > spend on upgrading the clients can be even bigger than implementing the > new > > feature itself. So there are some attractions if we can do 2 and 3 > together > > instead of separately. Maybe after (1) is done it will be easier to do > > protocol migration. But if we are able to come to an agreement on the > > timestamp solution, I would prefer to have it together with relative > offset > > in the interest of avoiding another wire protocol change (the process to > > migrate to relative offset is exactly the same as migrate to message with > > timestamp). > > > > In terms of timestamp. I completely agree that having client timestamp
Re: [DISCUSS] KIP-31 - Message format change proposal
Neha and Jay, Thanks a lot for the feedback. Good point about splitting the discussion. I have split the proposal to three KIPs and it does make each discussion more clear: KIP-31 - Message format change (Use relative offset) KIP-32 - Add CreateTime and LogAppendTime to Kafka message KIP-33 - Build a time-based log index KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 and KIP-32 first for now. I will create a separate discussion thread for KIP-32 and reply the concerns you raised regarding the timestamp. So far it looks there is no objection to KIP-31. Since I removed a few part from previous KIP and only left the relative offset proposal, it would be great if people can take another look to see if there is any concerns. Thanks, Jiangjie (Becket) Qin On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede wrote: > Becket, > > Nice write-up. Few thoughts - > > I'd split up the discussion for simplicity. Note that you can always group > several of these in one patch to reduce the protocol changes people have to > deal with.This is just a suggestion, but I think the following split might > make it easier to tackle the changes being proposed - > >- Relative offsets >- Introducing the concept of time >- Time-based indexing (separate the usage of the timestamp field from >how/whether we want to include a timestamp in the message) > > I'm a +1 on relative offsets, we should've done it back when we introduced > it. Other than reducing the CPU overhead, this will also reduce the garbage > collection overhead on the brokers. > > On the timestamp field, I generally agree that we should add a timestamp to > a Kafka message but I'm not quite sold on how this KIP suggests the > timestamp be set. Will avoid repeating the downsides of a broker side > timestamp mentioned previously in this thread. I think the topic of > including a timestamp in a Kafka message requires a lot more thought and > details than what's in this KIP. I'd suggest we make it a separate KIP that > includes a list of all the different use cases for the timestamp (beyond > log retention) including stream processing and discuss tradeoffs of > including client and broker side timestamps. > > Agree with the benefit of time-based indexing, but haven't had a chance to > dive into the design details yet. > > Thanks, > Neha > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps wrote: > > > Hey Beckett, > > > > I was proposing splitting up the KIP just for simplicity of discussion. > You > > can still implement them in one patch. I think otherwise it will be hard > to > > discuss/vote on them since if you like the offset proposal but not the > time > > proposal what do you do? > > > > Introducing a second notion of time into Kafka is a pretty massive > > philosophical change so it kind of warrants it's own KIP I think it isn't > > just "Change message format". > > > > WRT time I think one thing to clarify in the proposal is how MM will have > > access to set the timestamp? Presumably this will be a new field in > > ProducerRecord, right? If so then any user can set the timestamp, right? > > I'm not sure you answered the questions around how this will work for MM > > since when MM retains timestamps from multiple partitions they will then > be > > out of order and in the past (so the max(lastAppendedTimestamp, > > currentTimeMillis) override you proposed will not work, right?). If we > > don't do this then when you set up mirroring the data will all be new and > > you have the same retention problem you described. Maybe I missed > > something...? > > > > My main motivation is that given that both Samza and Kafka streams are > > doing work that implies a mandatory client-defined notion of time, I > really > > think introducing a different mandatory notion of time in Kafka is going > to > > be quite odd. We should think hard about how client-defined time could > > work. I'm not sure if it can, but I'm also not sure that it can't. Having > > both will be odd. Did you chat about this with Yi/Kartik on the Samza > side? > > > > When you are saying it won't work you are assuming some particular > > implementation? Maybe that the index is a monotonically increasing set of > > pointers to the least record with a timestamp larger than the index time? > > In other words a search for time X gives the largest offset at which all > > records are <= X? > > > > For retention, I agree with the problem you point out, but I think what > you > > are saying in that case is that you want a size limit too. If you use > > system time you actually hit the same problem: say you do a full dump of > a > > DB table with a setting of 7 days retention, your retention will actually > > not get enforced for the first 7 days because the data is "new to Kafka". > > > > -Jay > > > > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin > > > wrote: > > > > > Jay, > > > > > > Thanks for the comments. Yes, there are actually three proposals as you > > > pointed out.
Re: [DISCUSS] KIP-31 - Message format change proposal
Great, can we change the name to something related to the change--"KIP-31: Move to relative offsets in compressed message sets". Also you had mentioned before you were going to expand on the mechanics of handling these log format changes, right? -Jay On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin wrote: > Neha and Jay, > > Thanks a lot for the feedback. Good point about splitting the discussion. I > have split the proposal to three KIPs and it does make each discussion more > clear: > KIP-31 - Message format change (Use relative offset) > KIP-32 - Add CreateTime and LogAppendTime to Kafka message > KIP-33 - Build a time-based log index > > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 > and KIP-32 first for now. I will create a separate discussion thread for > KIP-32 and reply the concerns you raised regarding the timestamp. > > So far it looks there is no objection to KIP-31. Since I removed a few part > from previous KIP and only left the relative offset proposal, it would be > great if people can take another look to see if there is any concerns. > > Thanks, > > Jiangjie (Becket) Qin > > > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede wrote: > > > Becket, > > > > Nice write-up. Few thoughts - > > > > I'd split up the discussion for simplicity. Note that you can always > group > > several of these in one patch to reduce the protocol changes people have > to > > deal with.This is just a suggestion, but I think the following split > might > > make it easier to tackle the changes being proposed - > > > >- Relative offsets > >- Introducing the concept of time > >- Time-based indexing (separate the usage of the timestamp field from > >how/whether we want to include a timestamp in the message) > > > > I'm a +1 on relative offsets, we should've done it back when we > introduced > > it. Other than reducing the CPU overhead, this will also reduce the > garbage > > collection overhead on the brokers. > > > > On the timestamp field, I generally agree that we should add a timestamp > to > > a Kafka message but I'm not quite sold on how this KIP suggests the > > timestamp be set. Will avoid repeating the downsides of a broker side > > timestamp mentioned previously in this thread. I think the topic of > > including a timestamp in a Kafka message requires a lot more thought and > > details than what's in this KIP. I'd suggest we make it a separate KIP > that > > includes a list of all the different use cases for the timestamp (beyond > > log retention) including stream processing and discuss tradeoffs of > > including client and broker side timestamps. > > > > Agree with the benefit of time-based indexing, but haven't had a chance > to > > dive into the design details yet. > > > > Thanks, > > Neha > > > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps wrote: > > > > > Hey Beckett, > > > > > > I was proposing splitting up the KIP just for simplicity of discussion. > > You > > > can still implement them in one patch. I think otherwise it will be > hard > > to > > > discuss/vote on them since if you like the offset proposal but not the > > time > > > proposal what do you do? > > > > > > Introducing a second notion of time into Kafka is a pretty massive > > > philosophical change so it kind of warrants it's own KIP I think it > isn't > > > just "Change message format". > > > > > > WRT time I think one thing to clarify in the proposal is how MM will > have > > > access to set the timestamp? Presumably this will be a new field in > > > ProducerRecord, right? If so then any user can set the timestamp, > right? > > > I'm not sure you answered the questions around how this will work for > MM > > > since when MM retains timestamps from multiple partitions they will > then > > be > > > out of order and in the past (so the max(lastAppendedTimestamp, > > > currentTimeMillis) override you proposed will not work, right?). If we > > > don't do this then when you set up mirroring the data will all be new > and > > > you have the same retention problem you described. Maybe I missed > > > something...? > > > > > > My main motivation is that given that both Samza and Kafka streams are > > > doing work that implies a mandatory client-defined notion of time, I > > really > > > think introducing a different mandatory notion of time in Kafka is > going > > to > > > be quite odd. We should think hard about how client-defined time could > > > work. I'm not sure if it can, but I'm also not sure that it can't. > Having > > > both will be odd. Did you chat about this with Yi/Kartik on the Samza > > side? > > > > > > When you are saying it won't work you are assuming some particular > > > implementation? Maybe that the index is a monotonically increasing set > of > > > pointers to the least record with a timestamp larger than the index > time? > > > In other words a search for time X gives the largest offset at which > all > > > records are <= X? > > > > > > For retention, I agree with the problem you
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Jay, I just changed the KIP title and updated the KIP page. And yes, we are working on a general version control proposal to make the protocol migration like this more smooth. I will also create a KIP for that soon. Thanks, Jiangjie (Becket) Qin On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps wrote: > Great, can we change the name to something related to the change--"KIP-31: > Move to relative offsets in compressed message sets". > > Also you had mentioned before you were going to expand on the mechanics of > handling these log format changes, right? > > -Jay > > On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin > wrote: > > > Neha and Jay, > > > > Thanks a lot for the feedback. Good point about splitting the > discussion. I > > have split the proposal to three KIPs and it does make each discussion > more > > clear: > > KIP-31 - Message format change (Use relative offset) > > KIP-32 - Add CreateTime and LogAppendTime to Kafka message > > KIP-33 - Build a time-based log index > > > > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 > > and KIP-32 first for now. I will create a separate discussion thread for > > KIP-32 and reply the concerns you raised regarding the timestamp. > > > > So far it looks there is no objection to KIP-31. Since I removed a few > part > > from previous KIP and only left the relative offset proposal, it would be > > great if people can take another look to see if there is any concerns. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede wrote: > > > > > Becket, > > > > > > Nice write-up. Few thoughts - > > > > > > I'd split up the discussion for simplicity. Note that you can always > > group > > > several of these in one patch to reduce the protocol changes people > have > > to > > > deal with.This is just a suggestion, but I think the following split > > might > > > make it easier to tackle the changes being proposed - > > > > > >- Relative offsets > > >- Introducing the concept of time > > >- Time-based indexing (separate the usage of the timestamp field > from > > >how/whether we want to include a timestamp in the message) > > > > > > I'm a +1 on relative offsets, we should've done it back when we > > introduced > > > it. Other than reducing the CPU overhead, this will also reduce the > > garbage > > > collection overhead on the brokers. > > > > > > On the timestamp field, I generally agree that we should add a > timestamp > > to > > > a Kafka message but I'm not quite sold on how this KIP suggests the > > > timestamp be set. Will avoid repeating the downsides of a broker side > > > timestamp mentioned previously in this thread. I think the topic of > > > including a timestamp in a Kafka message requires a lot more thought > and > > > details than what's in this KIP. I'd suggest we make it a separate KIP > > that > > > includes a list of all the different use cases for the timestamp > (beyond > > > log retention) including stream processing and discuss tradeoffs of > > > including client and broker side timestamps. > > > > > > Agree with the benefit of time-based indexing, but haven't had a chance > > to > > > dive into the design details yet. > > > > > > Thanks, > > > Neha > > > > > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps wrote: > > > > > > > Hey Beckett, > > > > > > > > I was proposing splitting up the KIP just for simplicity of > discussion. > > > You > > > > can still implement them in one patch. I think otherwise it will be > > hard > > > to > > > > discuss/vote on them since if you like the offset proposal but not > the > > > time > > > > proposal what do you do? > > > > > > > > Introducing a second notion of time into Kafka is a pretty massive > > > > philosophical change so it kind of warrants it's own KIP I think it > > isn't > > > > just "Change message format". > > > > > > > > WRT time I think one thing to clarify in the proposal is how MM will > > have > > > > access to set the timestamp? Presumably this will be a new field in > > > > ProducerRecord, right? If so then any user can set the timestamp, > > right? > > > > I'm not sure you answered the questions around how this will work for > > MM > > > > since when MM retains timestamps from multiple partitions they will > > then > > > be > > > > out of order and in the past (so the max(lastAppendedTimestamp, > > > > currentTimeMillis) override you proposed will not work, right?). If > we > > > > don't do this then when you set up mirroring the data will all be new > > and > > > > you have the same retention problem you described. Maybe I missed > > > > something...? > > > > > > > > My main motivation is that given that both Samza and Kafka streams > are > > > > doing work that implies a mandatory client-defined notion of time, I > > > really > > > > think introducing a different mandatory notion of time in Kafka is > > going > > > to > > > > be quite odd. We should think hard about how client-defined time > could > > >
Re: [DISCUSS] KIP-31 - Message format change proposal
I just wanted to comment on a few points made earlier in this thread: Concerns on clock skew: at least for the original proposal's scope (which was more for honoring retention broker-side) this would only be an issue when spanning leader movements right? i.e., leader migration latency has to be much less than clock skew for this to be a real issue wouldn’t it? Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are the right place to reason about client-side timestamps precisely due to the nuances that have been discussed at length in this thread. My preference would have been to the timestamp (now called LogAppendTimestamp) have nothing to do with the applications. Ewen raised a valid concern about leaking such “private/server-side” timestamps into the protocol spec. i.e., it is fine to have the CreateTime which is expressly client-provided and immutable thereafter, but the LogAppendTime is also going part of the protocol and it would be good to avoid exposure (to client developers) if possible. Ok, so here is a slightly different approach that I was just thinking about (and did not think too far so it may not work): do not add the LogAppendTime to messages. Instead, build the time-based index on the server side on message arrival time alone. Introduce a new ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also include the slice of the time-based index for the follower broker. This way we can at least keep timestamps aligned across brokers for retention purposes. We do lose the append timestamp for mirroring pipelines (which appears to be the case in KIP-32 as well). Configurable index granularity: We can do this but I’m not sure it is very useful and as Jay noted, a major change from the old proposal linked from the KIP is the sparse time-based index which we felt was essential to bound memory usage (and having timestamps on each log index entry was probably a big waste since in the common case several messages span the same timestamp). BTW another benefit of the second index is that it makes it easier to roll-back or throw away if necessary (vs. modifying the existing index format) - although that obviously does not help with rolling back the timestamp change in the message format, but it is one less thing to worry about. Versioning: I’m not sure everyone is saying the same thing wrt the scope of this. There is the record format change, but I also think this ties into all of the API versioning that we already have in Kafka. The current API versioning approach works fine for upgrades/downgrades across official Kafka releases, but not so well between releases. (We almost got bitten by this at LinkedIn with the recent changes to various requests but were able to work around these.) We can clarify this in the follow-up KIP. Thanks, Joel On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin wrote: > Hi Jay, > > I just changed the KIP title and updated the KIP page. > > And yes, we are working on a general version control proposal to make the > protocol migration like this more smooth. I will also create a KIP for that > soon. > > Thanks, > > Jiangjie (Becket) Qin > > > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps wrote: > >> Great, can we change the name to something related to the change--"KIP-31: >> Move to relative offsets in compressed message sets". >> >> Also you had mentioned before you were going to expand on the mechanics of >> handling these log format changes, right? >> >> -Jay >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin >> wrote: >> >> > Neha and Jay, >> > >> > Thanks a lot for the feedback. Good point about splitting the >> discussion. I >> > have split the proposal to three KIPs and it does make each discussion >> more >> > clear: >> > KIP-31 - Message format change (Use relative offset) >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message >> > KIP-33 - Build a time-based log index >> > >> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 >> > and KIP-32 first for now. I will create a separate discussion thread for >> > KIP-32 and reply the concerns you raised regarding the timestamp. >> > >> > So far it looks there is no objection to KIP-31. Since I removed a few >> part >> > from previous KIP and only left the relative offset proposal, it would be >> > great if people can take another look to see if there is any concerns. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede wrote: >> > >> > > Becket, >> > > >> > > Nice write-up. Few thoughts - >> > > >> > > I'd split up the discussion for simplicity. Note that you can always >> > group >> > > several of these in one patch to reduce the protocol changes people >> have >> > to >> > > deal with.This is just a suggestion, but I think the following split >> > might >> > > make it easier to tackle the changes being proposed - >> > > >> > >- Relative offsets >> > >- Introducing the concept of time >> > >- T
Re: [DISCUSS] KIP-31 - Message format change proposal
Hey Joel, Yeah the clock issues would arrive when leadership changed or the time on the machine changed. Don't you see all the same issues you see with client-defined timestamp's if you let mm control the timestamp as you were proposing? That means time is no longer monotonic. A mixture of mirrored data from two source clusters may be arbitrarily skewed just as from two clients. Also, Joel, can you just confirm that you guys have talked through the whole timestamp thing with the Samza folks at LI? The reason I ask about this is that Samza and Kafka Streams (KIP-28) are both trying to rely on *client-defined* notions of time and there is a really well thought out rationale for why it needs to be client-defined. In the absence of these I think I would also have agreed with you guys about preferring server time. But because of those, I think it may be super confusing if one part of Kafka has a mandatory notion of time that means X and another part has a mandatory notion that means Y. WRT your idea of a FollowerFetchRequestI had thought of a similar idea where we use the leader's timestamps to approximately set the follower's timestamps. I had thought of just adding a partition metadata request that would subsume the current offset/time lookup and could be used by the follower to try to approximately keep their timestamps kosher. It's a little hacky and doesn't help with MM but it is also maybe less invasive so that approach could be viable. -Jay On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy wrote: > I just wanted to comment on a few points made earlier in this thread: > > Concerns on clock skew: at least for the original proposal's scope > (which was more for honoring retention broker-side) this would only be > an issue when spanning leader movements right? i.e., leader migration > latency has to be much less than clock skew for this to be a real > issue wouldn’t it? > > Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are > the right place to reason about client-side timestamps precisely due > to the nuances that have been discussed at length in this thread. My > preference would have been to the timestamp (now called > LogAppendTimestamp) have nothing to do with the applications. Ewen > raised a valid concern about leaking such “private/server-side” > timestamps into the protocol spec. i.e., it is fine to have the > CreateTime which is expressly client-provided and immutable > thereafter, but the LogAppendTime is also going part of the protocol > and it would be good to avoid exposure (to client developers) if > possible. Ok, so here is a slightly different approach that I was just > thinking about (and did not think too far so it may not work): do not > add the LogAppendTime to messages. Instead, build the time-based index > on the server side on message arrival time alone. Introduce a new > ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also > include the slice of the time-based index for the follower broker. > This way we can at least keep timestamps aligned across brokers for > retention purposes. We do lose the append timestamp for mirroring > pipelines (which appears to be the case in KIP-32 as well). > > Configurable index granularity: We can do this but I’m not sure it is > very useful and as Jay noted, a major change from the old proposal > linked from the KIP is the sparse time-based index which we felt was > essential to bound memory usage (and having timestamps on each log > index entry was probably a big waste since in the common case several > messages span the same timestamp). BTW another benefit of the second > index is that it makes it easier to roll-back or throw away if > necessary (vs. modifying the existing index format) - although that > obviously does not help with rolling back the timestamp change in the > message format, but it is one less thing to worry about. > > Versioning: I’m not sure everyone is saying the same thing wrt the > scope of this. There is the record format change, but I also think > this ties into all of the API versioning that we already have in > Kafka. The current API versioning approach works fine for > upgrades/downgrades across official Kafka releases, but not so well > between releases. (We almost got bitten by this at LinkedIn with the > recent changes to various requests but were able to work around > these.) We can clarify this in the follow-up KIP. > > Thanks, > > Joel > > > On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin > wrote: > > Hi Jay, > > > > I just changed the KIP title and updated the KIP page. > > > > And yes, we are working on a general version control proposal to make the > > protocol migration like this more smooth. I will also create a KIP for > that > > soon. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps wrote: > > > >> Great, can we change the name to something related to the > change--"KIP-31: > >> Move to relative offsets in compressed mes
Re: [DISCUSS] KIP-31 - Message format change proposal
> Don't you see all the same issues you see with client-defined timestamp's > if you let mm control the timestamp as you were proposing? That means time Actually I don't think that was in the proposal (or was it?). i.e., I think it was always supposed to be controlled by the broker (and not MM). > Also, Joel, can you just confirm that you guys have talked through the > whole timestamp thing with the Samza folks at LI? The reason I ask about > this is that Samza and Kafka Streams (KIP-28) are both trying to rely on We have not. This is a good point - we will follow-up. > WRT your idea of a FollowerFetchRequestI had thought of a similar idea > where we use the leader's timestamps to approximately set the follower's > timestamps. I had thought of just adding a partition metadata request that > would subsume the current offset/time lookup and could be used by the > follower to try to approximately keep their timestamps kosher. It's a > little hacky and doesn't help with MM but it is also maybe less invasive so > that approach could be viable. That would also work, but perhaps responding with the actual leader offset-timestamp entries (corresponding to the fetched portion) would be exact and it should be small as well. Anyway, the main motivation in this was to avoid leaking server-side timestamps to the message-format if people think it is worth it so the alternatives are implementation details. My original instinct was that it also avoids a backwards incompatible change (but it does not because we also have the relative offset change). Thanks, Joel > > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy wrote: > >> I just wanted to comment on a few points made earlier in this thread: >> >> Concerns on clock skew: at least for the original proposal's scope >> (which was more for honoring retention broker-side) this would only be >> an issue when spanning leader movements right? i.e., leader migration >> latency has to be much less than clock skew for this to be a real >> issue wouldn’t it? >> >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are >> the right place to reason about client-side timestamps precisely due >> to the nuances that have been discussed at length in this thread. My >> preference would have been to the timestamp (now called >> LogAppendTimestamp) have nothing to do with the applications. Ewen >> raised a valid concern about leaking such “private/server-side” >> timestamps into the protocol spec. i.e., it is fine to have the >> CreateTime which is expressly client-provided and immutable >> thereafter, but the LogAppendTime is also going part of the protocol >> and it would be good to avoid exposure (to client developers) if >> possible. Ok, so here is a slightly different approach that I was just >> thinking about (and did not think too far so it may not work): do not >> add the LogAppendTime to messages. Instead, build the time-based index >> on the server side on message arrival time alone. Introduce a new >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also >> include the slice of the time-based index for the follower broker. >> This way we can at least keep timestamps aligned across brokers for >> retention purposes. We do lose the append timestamp for mirroring >> pipelines (which appears to be the case in KIP-32 as well). >> >> Configurable index granularity: We can do this but I’m not sure it is >> very useful and as Jay noted, a major change from the old proposal >> linked from the KIP is the sparse time-based index which we felt was >> essential to bound memory usage (and having timestamps on each log >> index entry was probably a big waste since in the common case several >> messages span the same timestamp). BTW another benefit of the second >> index is that it makes it easier to roll-back or throw away if >> necessary (vs. modifying the existing index format) - although that >> obviously does not help with rolling back the timestamp change in the >> message format, but it is one less thing to worry about. >> >> Versioning: I’m not sure everyone is saying the same thing wrt the >> scope of this. There is the record format change, but I also think >> this ties into all of the API versioning that we already have in >> Kafka. The current API versioning approach works fine for >> upgrades/downgrades across official Kafka releases, but not so well >> between releases. (We almost got bitten by this at LinkedIn with the >> recent changes to various requests but were able to work around >> these.) We can clarify this in the follow-up KIP. >> >> Thanks, >> >> Joel >> >> >> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin >> wrote: >> > Hi Jay, >> > >> > I just changed the KIP title and updated the KIP page. >> > >> > And yes, we are working on a general version control proposal to make the >> > protocol migration like this more smooth. I will also create a KIP for >> that >> > soon. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > On
Re: [DISCUSS] KIP-31 - Message format change proposal
Ah, I see, I think I misunderstood about MM, it was called out in the proposal and I thought you were saying you'd retain the timestamp but I think you're calling out that you're not. In that case you do have the opposite problem, right? When you add mirroring for a topic all that data will have a timestamp of now and retention won't be right. Not a blocker but a bit of a gotcha. -Jay On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy wrote: > > Don't you see all the same issues you see with client-defined timestamp's > > if you let mm control the timestamp as you were proposing? That means > time > > Actually I don't think that was in the proposal (or was it?). i.e., I > think it was always supposed to be controlled by the broker (and not > MM). > > > Also, Joel, can you just confirm that you guys have talked through the > > whole timestamp thing with the Samza folks at LI? The reason I ask about > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely on > > We have not. This is a good point - we will follow-up. > > > WRT your idea of a FollowerFetchRequestI had thought of a similar idea > > where we use the leader's timestamps to approximately set the follower's > > timestamps. I had thought of just adding a partition metadata request > that > > would subsume the current offset/time lookup and could be used by the > > follower to try to approximately keep their timestamps kosher. It's a > > little hacky and doesn't help with MM but it is also maybe less invasive > so > > that approach could be viable. > > That would also work, but perhaps responding with the actual leader > offset-timestamp entries (corresponding to the fetched portion) would > be exact and it should be small as well. Anyway, the main motivation > in this was to avoid leaking server-side timestamps to the > message-format if people think it is worth it so the alternatives are > implementation details. My original instinct was that it also avoids a > backwards incompatible change (but it does not because we also have > the relative offset change). > > Thanks, > > Joel > > > > > > > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy wrote: > > > >> I just wanted to comment on a few points made earlier in this thread: > >> > >> Concerns on clock skew: at least for the original proposal's scope > >> (which was more for honoring retention broker-side) this would only be > >> an issue when spanning leader movements right? i.e., leader migration > >> latency has to be much less than clock skew for this to be a real > >> issue wouldn’t it? > >> > >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are > >> the right place to reason about client-side timestamps precisely due > >> to the nuances that have been discussed at length in this thread. My > >> preference would have been to the timestamp (now called > >> LogAppendTimestamp) have nothing to do with the applications. Ewen > >> raised a valid concern about leaking such “private/server-side” > >> timestamps into the protocol spec. i.e., it is fine to have the > >> CreateTime which is expressly client-provided and immutable > >> thereafter, but the LogAppendTime is also going part of the protocol > >> and it would be good to avoid exposure (to client developers) if > >> possible. Ok, so here is a slightly different approach that I was just > >> thinking about (and did not think too far so it may not work): do not > >> add the LogAppendTime to messages. Instead, build the time-based index > >> on the server side on message arrival time alone. Introduce a new > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also > >> include the slice of the time-based index for the follower broker. > >> This way we can at least keep timestamps aligned across brokers for > >> retention purposes. We do lose the append timestamp for mirroring > >> pipelines (which appears to be the case in KIP-32 as well). > >> > >> Configurable index granularity: We can do this but I’m not sure it is > >> very useful and as Jay noted, a major change from the old proposal > >> linked from the KIP is the sparse time-based index which we felt was > >> essential to bound memory usage (and having timestamps on each log > >> index entry was probably a big waste since in the common case several > >> messages span the same timestamp). BTW another benefit of the second > >> index is that it makes it easier to roll-back or throw away if > >> necessary (vs. modifying the existing index format) - although that > >> obviously does not help with rolling back the timestamp change in the > >> message format, but it is one less thing to worry about. > >> > >> Versioning: I’m not sure everyone is saying the same thing wrt the > >> scope of this. There is the record format change, but I also think > >> this ties into all of the API versioning that we already have in > >> Kafka. The current API versioning approach works fine for > >> upgrades/downgrades across official Kafka releases, but not so wel
Re: [DISCUSS] KIP-31 - Message format change proposal
Re: MM preserving timestamps: Yes, this was how I interpreted the point in the KIP and I only raised the issue because it restricts the usefulness of timestamps anytime MM is involved. I agree it's not a deal breaker, but I wanted to understand exact impact of the change. Some users seem to want to be able to seek by application-defined timestamps (despite the many obvious issues involved), and the proposal clearly would not support that unless the timestamps submitted with the produce requests were respected. If we ignore client submitted timestamps, then we probably want to try to hide the timestamps as much as possible in any public interface (e.g. never shows up in any public consumer APIs), but expose it just enough to be useful for operational purposes. Sorry if my devil's advocate position / attempt to map the design space led to some confusion! -Ewen On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps wrote: > Ah, I see, I think I misunderstood about MM, it was called out in the > proposal and I thought you were saying you'd retain the timestamp but I > think you're calling out that you're not. In that case you do have the > opposite problem, right? When you add mirroring for a topic all that data > will have a timestamp of now and retention won't be right. Not a blocker > but a bit of a gotcha. > > -Jay > > > > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy wrote: > > > > Don't you see all the same issues you see with client-defined > timestamp's > > > if you let mm control the timestamp as you were proposing? That means > > time > > > > Actually I don't think that was in the proposal (or was it?). i.e., I > > think it was always supposed to be controlled by the broker (and not > > MM). > > > > > Also, Joel, can you just confirm that you guys have talked through the > > > whole timestamp thing with the Samza folks at LI? The reason I ask > about > > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely > on > > > > We have not. This is a good point - we will follow-up. > > > > > WRT your idea of a FollowerFetchRequestI had thought of a similar idea > > > where we use the leader's timestamps to approximately set the > follower's > > > timestamps. I had thought of just adding a partition metadata request > > that > > > would subsume the current offset/time lookup and could be used by the > > > follower to try to approximately keep their timestamps kosher. It's a > > > little hacky and doesn't help with MM but it is also maybe less > invasive > > so > > > that approach could be viable. > > > > That would also work, but perhaps responding with the actual leader > > offset-timestamp entries (corresponding to the fetched portion) would > > be exact and it should be small as well. Anyway, the main motivation > > in this was to avoid leaking server-side timestamps to the > > message-format if people think it is worth it so the alternatives are > > implementation details. My original instinct was that it also avoids a > > backwards incompatible change (but it does not because we also have > > the relative offset change). > > > > Thanks, > > > > Joel > > > > > > > > > > > > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy > wrote: > > > > > >> I just wanted to comment on a few points made earlier in this thread: > > >> > > >> Concerns on clock skew: at least for the original proposal's scope > > >> (which was more for honoring retention broker-side) this would only be > > >> an issue when spanning leader movements right? i.e., leader migration > > >> latency has to be much less than clock skew for this to be a real > > >> issue wouldn’t it? > > >> > > >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are > > >> the right place to reason about client-side timestamps precisely due > > >> to the nuances that have been discussed at length in this thread. My > > >> preference would have been to the timestamp (now called > > >> LogAppendTimestamp) have nothing to do with the applications. Ewen > > >> raised a valid concern about leaking such “private/server-side” > > >> timestamps into the protocol spec. i.e., it is fine to have the > > >> CreateTime which is expressly client-provided and immutable > > >> thereafter, but the LogAppendTime is also going part of the protocol > > >> and it would be good to avoid exposure (to client developers) if > > >> possible. Ok, so here is a slightly different approach that I was just > > >> thinking about (and did not think too far so it may not work): do not > > >> add the LogAppendTime to messages. Instead, build the time-based index > > >> on the server side on message arrival time alone. Introduce a new > > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also > > >> include the slice of the time-based index for the follower broker. > > >> This way we can at least keep timestamps aligned across brokers for > > >> retention purposes. We do lose the append timestamp for mirroring > > >> pipelines (which appears to be the case
Re: [DISCUSS] KIP-31 - Message format change proposal
Ewen and Jay, They way I see the LogAppendTime is another format of "offset". It serves the following purpose: 1. Locate messages not only by position, but also by time. The difference from offset is timestamp is not unique for all messags. 2. Allow broker to manage messages based on time, e.g. retention, rolling 3. Provide convenience for user to search message not only by offset, but also by timestamp. For purpose (2) we don't need per message server timestamp. We only need per log segment server timestamp and propagate it among brokers. For (1) and (3), we need per message timestamp. Then the question is whether we should use CreateTime or LogAppendTime? I completely agree that an application timestamp is very useful for many use cases. But it seems to me that having Kafka to understand and maintain application timestamp is a bit over demanding. So I think there is value to pass on CreateTime for application convenience, but I am not sure it can replace LogAppendTime. Managing out-of-order CreateTime is equivalent to allowing producer to send their own offset and ask broker to manage the offset for them, It is going to be very hard to maintain and could create huge performance/functional issue because of complicated logic. About whether we should expose LogAppendTime to broker, I agree that server timestamp is internal to broker, but isn't offset also an internal concept? Arguably it's not provided by producer so consumer application logic does not have to know offset. But user needs to know offset because they need to know "where is the message" in the log. LogAppendTime provides the answer of "When was the message appended" to the log. So personally I think it is reasonable to expose the LogAppendTime to consumers. I can see some use cases of exposing the LogAppendTime, to name some: 1. Let's say broker has 7 days of log retention, some application wants to reprocess the data in past 3 days. User can simply provide the timestamp and start consume. 2. User can easily know lag by time. 3. Cross cluster fail over. This is a more complicated use case, there are two goals: 1) Not lose message; and 2) do not reconsume tons of messages. Only knowing offset of cluster A won't help with finding fail over point in cluster B because an offset of a cluster means nothing to another cluster. Timestamp however is a good cross cluster reference in this case. Thanks, Jiangjie (Becket) Qin On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava wrote: > Re: MM preserving timestamps: Yes, this was how I interpreted the point in > the KIP and I only raised the issue because it restricts the usefulness of > timestamps anytime MM is involved. I agree it's not a deal breaker, but I > wanted to understand exact impact of the change. Some users seem to want to > be able to seek by application-defined timestamps (despite the many obvious > issues involved), and the proposal clearly would not support that unless > the timestamps submitted with the produce requests were respected. If we > ignore client submitted timestamps, then we probably want to try to hide > the timestamps as much as possible in any public interface (e.g. never > shows up in any public consumer APIs), but expose it just enough to be > useful for operational purposes. > > Sorry if my devil's advocate position / attempt to map the design space led > to some confusion! > > -Ewen > > > On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps wrote: > > > Ah, I see, I think I misunderstood about MM, it was called out in the > > proposal and I thought you were saying you'd retain the timestamp but I > > think you're calling out that you're not. In that case you do have the > > opposite problem, right? When you add mirroring for a topic all that data > > will have a timestamp of now and retention won't be right. Not a blocker > > but a bit of a gotcha. > > > > -Jay > > > > > > > > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy wrote: > > > > > > Don't you see all the same issues you see with client-defined > > timestamp's > > > > if you let mm control the timestamp as you were proposing? That means > > > time > > > > > > Actually I don't think that was in the proposal (or was it?). i.e., I > > > think it was always supposed to be controlled by the broker (and not > > > MM). > > > > > > > Also, Joel, can you just confirm that you guys have talked through > the > > > > whole timestamp thing with the Samza folks at LI? The reason I ask > > about > > > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely > > on > > > > > > We have not. This is a good point - we will follow-up. > > > > > > > WRT your idea of a FollowerFetchRequestI had thought of a similar > idea > > > > where we use the leader's timestamps to approximately set the > > follower's > > > > timestamps. I had thought of just adding a partition metadata request > > > that > > > > would subsume the current offset/time lookup and could be used by the > > > > follower to try to approximately keep
Re: [DISCUSS] KIP-31 - Message format change proposal
Jay had mentioned the scenario of mirror-maker bootstrap which would effectively reset the logAppendTimestamps for the bootstrapped data. If we don't include logAppendTimestamps in each message there is a similar scenario when rebuilding indexes during recovery. So it seems it may be worth adding that timestamp to messages. The drawback to that is exposing a server-side concept in the protocol (although we already do that with offsets). logAppendTimestamp really should be decided by the broker so I think the first scenario may have to be written off as a gotcha, but the second may be worth addressing (by adding it to the message format). The other point that Jay raised which needs to be addressed (since we require monotically increasing timestamps in the index) in the proposal is changing time on the server (I'm a little less concerned about NTP clock skews than a user explicitly changing the server's time - i.e., big clock skews). We would at least want to "set back" all the existing timestamps to guarantee non-decreasing timestamps with future messages. I'm not sure at this point how best to handle that, but we could perhaps have a epoch/base-time (or time-correction) stored in the log directories and base all log index timestamps off that base-time (or corrected). So if at any time you determine that time has changed backwards you can adjust that base-time without having to fix up all the entries. Without knowing the exact diff between the previous clock and new clock we cannot adjust the times exactly, but we can at least ensure increasing timestamps. On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin wrote: > Ewen and Jay, > > They way I see the LogAppendTime is another format of "offset". It serves > the following purpose: > 1. Locate messages not only by position, but also by time. The difference > from offset is timestamp is not unique for all messags. > 2. Allow broker to manage messages based on time, e.g. retention, rolling > 3. Provide convenience for user to search message not only by offset, but > also by timestamp. > > For purpose (2) we don't need per message server timestamp. We only need > per log segment server timestamp and propagate it among brokers. > > For (1) and (3), we need per message timestamp. Then the question is > whether we should use CreateTime or LogAppendTime? > > I completely agree that an application timestamp is very useful for many > use cases. But it seems to me that having Kafka to understand and maintain > application timestamp is a bit over demanding. So I think there is value to > pass on CreateTime for application convenience, but I am not sure it can > replace LogAppendTime. Managing out-of-order CreateTime is equivalent to > allowing producer to send their own offset and ask broker to manage the > offset for them, It is going to be very hard to maintain and could create > huge performance/functional issue because of complicated logic. > > About whether we should expose LogAppendTime to broker, I agree that server > timestamp is internal to broker, but isn't offset also an internal concept? > Arguably it's not provided by producer so consumer application logic does > not have to know offset. But user needs to know offset because they need to > know "where is the message" in the log. LogAppendTime provides the answer > of "When was the message appended" to the log. So personally I think it is > reasonable to expose the LogAppendTime to consumers. > > I can see some use cases of exposing the LogAppendTime, to name some: > 1. Let's say broker has 7 days of log retention, some application wants to > reprocess the data in past 3 days. User can simply provide the timestamp > and start consume. > 2. User can easily know lag by time. > 3. Cross cluster fail over. This is a more complicated use case, there are > two goals: 1) Not lose message; and 2) do not reconsume tons of messages. > Only knowing offset of cluster A won't help with finding fail over point in > cluster B because an offset of a cluster means nothing to another cluster. > Timestamp however is a good cross cluster reference in this case. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava > wrote: > >> Re: MM preserving timestamps: Yes, this was how I interpreted the point in >> the KIP and I only raised the issue because it restricts the usefulness of >> timestamps anytime MM is involved. I agree it's not a deal breaker, but I >> wanted to understand exact impact of the change. Some users seem to want to >> be able to seek by application-defined timestamps (despite the many obvious >> issues involved), and the proposal clearly would not support that unless >> the timestamps submitted with the produce requests were respected. If we >> ignore client submitted timestamps, then we probably want to try to hide >> the timestamps as much as possible in any public interface (e.g. never >> shows up in any public consumer APIs), but expose it just enough to be >> usef
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Joel, Good point about rebuilding index. I agree that having a per message LogAppendTime might be necessary. About time adjustment, the solution sounds promising, but it might be better to make it as a follow up of the KIP because it seems a really rare use case. I have another thought on how to manage the out of order timestamps. Maybe we can do the following: Create a special log compacted topic __timestamp_index similar to topic, the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value is offset. In memory, we keep a map for each TopicPartition, the value is (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we can search out of order message and make sure no message is missing. Thoughts? Thanks, Jiangjie (Becket) Qin On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy wrote: > Jay had mentioned the scenario of mirror-maker bootstrap which would > effectively reset the logAppendTimestamps for the bootstrapped data. > If we don't include logAppendTimestamps in each message there is a > similar scenario when rebuilding indexes during recovery. So it seems > it may be worth adding that timestamp to messages. The drawback to > that is exposing a server-side concept in the protocol (although we > already do that with offsets). logAppendTimestamp really should be > decided by the broker so I think the first scenario may have to be > written off as a gotcha, but the second may be worth addressing (by > adding it to the message format). > > The other point that Jay raised which needs to be addressed (since we > require monotically increasing timestamps in the index) in the > proposal is changing time on the server (I'm a little less concerned > about NTP clock skews than a user explicitly changing the server's > time - i.e., big clock skews). We would at least want to "set back" > all the existing timestamps to guarantee non-decreasing timestamps > with future messages. I'm not sure at this point how best to handle > that, but we could perhaps have a epoch/base-time (or time-correction) > stored in the log directories and base all log index timestamps off > that base-time (or corrected). So if at any time you determine that > time has changed backwards you can adjust that base-time without > having to fix up all the entries. Without knowing the exact diff > between the previous clock and new clock we cannot adjust the times > exactly, but we can at least ensure increasing timestamps. > > On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin > wrote: > > Ewen and Jay, > > > > They way I see the LogAppendTime is another format of "offset". It serves > > the following purpose: > > 1. Locate messages not only by position, but also by time. The difference > > from offset is timestamp is not unique for all messags. > > 2. Allow broker to manage messages based on time, e.g. retention, rolling > > 3. Provide convenience for user to search message not only by offset, but > > also by timestamp. > > > > For purpose (2) we don't need per message server timestamp. We only need > > per log segment server timestamp and propagate it among brokers. > > > > For (1) and (3), we need per message timestamp. Then the question is > > whether we should use CreateTime or LogAppendTime? > > > > I completely agree that an application timestamp is very useful for many > > use cases. But it seems to me that having Kafka to understand and > maintain > > application timestamp is a bit over demanding. So I think there is value > to > > pass on CreateTime for application convenience, but I am not sure it can > > replace LogAppendTime. Managing out-of-order CreateTime is equivalent to > > allowing producer to send their own offset and ask broker to manage the > > offset for them, It is going to be very hard to maintain and could create > > huge performance/functional issue because of complicated logic. > > > > About whether we should expose LogAppendTime to broker, I agree that > server > > timestamp is internal to broker, but isn't offset also an internal > concept? > > Arguably it's not provided by producer so consumer application logic does > > not have to know offset. But user needs to know offset because they need > to > > know "where is the message" in the log. LogAppendTime provides the answer > > of "When was the message appended" to the log. So personally I think it > is > > reasonable to expose the LogAppendTime to consumers. > > > > I can see some use cases of exposing the LogAppendTime, to name some: > > 1. Let's say broker has 7 days of log retention, some application wants > to > > reprocess the data in past 3 days. User can simply provide the timestamp > > and start consume. > > 2. User can easily know lag by time. > > 3. Cross cluster fail over. This is a more complicated use case, there > are > > two goals: 1) Not lose message; and 2) do not reconsume tons of messages. > > Only knowing offset of cluster A won't help with finding fail over point > in > > cluster B because an offset of a clus
Re: [DISCUSS] KIP-31 - Message format change proposal
I was trying to send last email before KIP hangout so maybe did not think it through completely. By the way, the discussion is actually more related to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. (Although it seems all the discussion are still in this mailing thread...) This solution in last email is for indexing on CreateTime. It is essentially what Jay suggested except we use a timestamp map instead of a memory mapped index file. Please ignore the proposal of using a log compacted topic. The solution can be simplified to: Each broker keeps 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, Offset]]. The timestamp is on minute boundary. 2. A timestamp index file for each segment. When a broker receives a message (both leader or follower), it checks if the timestamp index map contains the timestamp for current segment. The broker add the offset to the map and append an entry to the timestamp index if the timestamp does not exist. i.e. we only use the index file as a persistent copy of the index timestamp map. When a log segment is deleted, we need to: 1. delete the TopicPartitionKeySegment key in the timestamp index map. 2. delete the timestamp index file This solution assumes we only keep CreateTime in the message. There are a few trade-offs in this solution: 1. The granularity of search will be per minute. 2. All the timestamp index map has to be in the memory all the time. 3. We need to think about another way to honor log retention time and time-based log rolling. 4. We lose the benefit brought by including LogAppendTime in the message mentioned earlier. I am not sure whether this solution is necessarily better than indexing on LogAppendTime. I will update KIP-33 to explain the solution to index on CreateTime and LogAppendTime respectively and put some more concrete use cases as well. Thanks, Jiangjie (Becket) Qin On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin wrote: > Hi Joel, > > Good point about rebuilding index. I agree that having a per message > LogAppendTime might be necessary. About time adjustment, the solution > sounds promising, but it might be better to make it as a follow up of the > KIP because it seems a really rare use case. > > I have another thought on how to manage the out of order timestamps. Maybe > we can do the following: > Create a special log compacted topic __timestamp_index similar to topic, > the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value > is offset. In memory, we keep a map for each TopicPartition, the value is > (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we > can search out of order message and make sure no message is missing. > > Thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy wrote: > >> Jay had mentioned the scenario of mirror-maker bootstrap which would >> effectively reset the logAppendTimestamps for the bootstrapped data. >> If we don't include logAppendTimestamps in each message there is a >> similar scenario when rebuilding indexes during recovery. So it seems >> it may be worth adding that timestamp to messages. The drawback to >> that is exposing a server-side concept in the protocol (although we >> already do that with offsets). logAppendTimestamp really should be >> decided by the broker so I think the first scenario may have to be >> written off as a gotcha, but the second may be worth addressing (by >> adding it to the message format). >> >> The other point that Jay raised which needs to be addressed (since we >> require monotically increasing timestamps in the index) in the >> proposal is changing time on the server (I'm a little less concerned >> about NTP clock skews than a user explicitly changing the server's >> time - i.e., big clock skews). We would at least want to "set back" >> all the existing timestamps to guarantee non-decreasing timestamps >> with future messages. I'm not sure at this point how best to handle >> that, but we could perhaps have a epoch/base-time (or time-correction) >> stored in the log directories and base all log index timestamps off >> that base-time (or corrected). So if at any time you determine that >> time has changed backwards you can adjust that base-time without >> having to fix up all the entries. Without knowing the exact diff >> between the previous clock and new clock we cannot adjust the times >> exactly, but we can at least ensure increasing timestamps. >> >> On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin >> wrote: >> > Ewen and Jay, >> > >> > They way I see the LogAppendTime is another format of "offset". It >> serves >> > the following purpose: >> > 1. Locate messages not only by position, but also by time. The >> difference >> > from offset is timestamp is not unique for all messags. >> > 2. Allow broker to manage messages based on time, e.g. retention, >> rolling >> > 3. Provide convenience for user to search message not only by offset, >> but >> > als
Re: [DISCUSS] KIP-31 - Message format change proposal
I just updated the KIP-33 to explain the indexing on CreateTime and LogAppendTime respectively. I also used some use case to compare the two solutions. Although this is for KIP-33, but it does give a some insights on whether it makes sense to have a per message LogAppendTime. https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index As a short summary of the conclusions we have already reached on timestamp: 1. It is good to add a timestamp to the message. 2. LogAppendTime should be used for broker policy enforcement (Log retention / rolling) 3. It is useful to have a CreateTime in message format, which is immutable after producer sends the message. There are following questions still in discussion: 1. Should we also add LogAppendTime to message format? 2. which timestamp should we use to build the index. Let's talk about question 1 first because question 2 is actually a follow up question for question 1. Here are what I think: 1a. To enforce broker log policy, theoretically we don't need per-message LogAppendTime. If we don't include LogAppendTime in message, we still need to implement a separate solution to pass log segment timestamps among brokers. That means if we don't include the LogAppendTime in message, there will be further complication in replication. 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail comparison) 1c. We have already exposed offset, which is essentially an internal concept of message in terms of position. Exposing LogAppendTime means we expose another internal concept of message in terms of time. Considering the above reasons, personally I think it worth adding the LogAppendTime to each message. Any thoughts? Thanks, Jiangjie (Becket) Qin On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin wrote: > I was trying to send last email before KIP hangout so maybe did not think > it through completely. By the way, the discussion is actually more related > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. > (Although it seems all the discussion are still in this mailing thread...) > This solution in last email is for indexing on CreateTime. It is > essentially what Jay suggested except we use a timestamp map instead of a > memory mapped index file. Please ignore the proposal of using a log > compacted topic. The solution can be simplified to: > > Each broker keeps > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, > Offset]]. The timestamp is on minute boundary. > 2. A timestamp index file for each segment. > When a broker receives a message (both leader or follower), it checks if > the timestamp index map contains the timestamp for current segment. The > broker add the offset to the map and append an entry to the timestamp index > if the timestamp does not exist. i.e. we only use the index file as a > persistent copy of the index timestamp map. > > When a log segment is deleted, we need to: > 1. delete the TopicPartitionKeySegment key in the timestamp index map. > 2. delete the timestamp index file > > This solution assumes we only keep CreateTime in the message. There are a > few trade-offs in this solution: > 1. The granularity of search will be per minute. > 2. All the timestamp index map has to be in the memory all the time. > 3. We need to think about another way to honor log retention time and > time-based log rolling. > 4. We lose the benefit brought by including LogAppendTime in the message > mentioned earlier. > > I am not sure whether this solution is necessarily better than indexing on > LogAppendTime. > > I will update KIP-33 to explain the solution to index on CreateTime and > LogAppendTime respectively and put some more concrete use cases as well. > > Thanks, > > Jiangjie (Becket) Qin > > > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin wrote: > >> Hi Joel, >> >> Good point about rebuilding index. I agree that having a per message >> LogAppendTime might be necessary. About time adjustment, the solution >> sounds promising, but it might be better to make it as a follow up of the >> KIP because it seems a really rare use case. >> >> I have another thought on how to manage the out of order timestamps. >> Maybe we can do the following: >> Create a special log compacted topic __timestamp_index similar to topic, >> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value >> is offset. In memory, we keep a map for each TopicPartition, the value is >> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we >> can search out of order message and make sure no message is missing. >> >> Thoughts? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy wrote: >> >>> Jay had mentioned the scenario of mirror-maker bootstrap which would >>> effectively reset the logAppendTimestamps for the bootstrapped data. >>> If we don't include logAppendTimestamps in each message there is a >>> similar scenario when rebuilding indexes
Re: [DISCUSS] KIP-31 - Message format change proposal
I suppose the jira link is different. It points to this jira : https://issues.apache.org/jira/browse/KAFKA-1 Thanks, Mayuresh On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin wrote: > I just updated the KIP-33 to explain the indexing on CreateTime and > LogAppendTime respectively. I also used some use case to compare the two > solutions. > Although this is for KIP-33, but it does give a some insights on whether it > makes sense to have a per message LogAppendTime. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > As a short summary of the conclusions we have already reached on timestamp: > 1. It is good to add a timestamp to the message. > 2. LogAppendTime should be used for broker policy enforcement (Log > retention / rolling) > 3. It is useful to have a CreateTime in message format, which is immutable > after producer sends the message. > > There are following questions still in discussion: > 1. Should we also add LogAppendTime to message format? > 2. which timestamp should we use to build the index. > > Let's talk about question 1 first because question 2 is actually a follow > up question for question 1. > Here are what I think: > 1a. To enforce broker log policy, theoretically we don't need per-message > LogAppendTime. If we don't include LogAppendTime in message, we still need > to implement a separate solution to pass log segment timestamps among > brokers. That means if we don't include the LogAppendTime in message, there > will be further complication in replication. > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > comparison) > 1c. We have already exposed offset, which is essentially an internal > concept of message in terms of position. Exposing LogAppendTime means we > expose another internal concept of message in terms of time. > > Considering the above reasons, personally I think it worth adding the > LogAppendTime to each message. > > Any thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin wrote: > > > I was trying to send last email before KIP hangout so maybe did not think > > it through completely. By the way, the discussion is actually more > related > > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. > > (Although it seems all the discussion are still in this mailing > thread...) > > This solution in last email is for indexing on CreateTime. It is > > essentially what Jay suggested except we use a timestamp map instead of a > > memory mapped index file. Please ignore the proposal of using a log > > compacted topic. The solution can be simplified to: > > > > Each broker keeps > > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, > > Offset]]. The timestamp is on minute boundary. > > 2. A timestamp index file for each segment. > > When a broker receives a message (both leader or follower), it checks if > > the timestamp index map contains the timestamp for current segment. The > > broker add the offset to the map and append an entry to the timestamp > index > > if the timestamp does not exist. i.e. we only use the index file as a > > persistent copy of the index timestamp map. > > > > When a log segment is deleted, we need to: > > 1. delete the TopicPartitionKeySegment key in the timestamp index map. > > 2. delete the timestamp index file > > > > This solution assumes we only keep CreateTime in the message. There are a > > few trade-offs in this solution: > > 1. The granularity of search will be per minute. > > 2. All the timestamp index map has to be in the memory all the time. > > 3. We need to think about another way to honor log retention time and > > time-based log rolling. > > 4. We lose the benefit brought by including LogAppendTime in the message > > mentioned earlier. > > > > I am not sure whether this solution is necessarily better than indexing > on > > LogAppendTime. > > > > I will update KIP-33 to explain the solution to index on CreateTime and > > LogAppendTime respectively and put some more concrete use cases as well. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin wrote: > > > >> Hi Joel, > >> > >> Good point about rebuilding index. I agree that having a per message > >> LogAppendTime might be necessary. About time adjustment, the solution > >> sounds promising, but it might be better to make it as a follow up of > the > >> KIP because it seems a really rare use case. > >> > >> I have another thought on how to manage the out of order timestamps. > >> Maybe we can do the following: > >> Create a special log compacted topic __timestamp_index similar to topic, > >> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the > value > >> is offset. In memory, we keep a map for each TopicPartition, the value > is > >> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This > way we > >> can search out of order message and make sure no mess
Re: [DISCUSS] KIP-31 - Message format change proposal
Mayuresh, I haven't created Jira for KIP-33 yet because it is still under discussion. I will remove the Jira link. Thanks, Jiangjie (Becket) Qin On Mon, Sep 14, 2015 at 8:15 PM, Mayuresh Gharat wrote: > I suppose the jira link is different. It points to this jira : > https://issues.apache.org/jira/browse/KAFKA-1 > > > Thanks, > > Mayuresh > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin > wrote: > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > LogAppendTime respectively. I also used some use case to compare the two > > solutions. > > Although this is for KIP-33, but it does give a some insights on whether > it > > makes sense to have a per message LogAppendTime. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > As a short summary of the conclusions we have already reached on > timestamp: > > 1. It is good to add a timestamp to the message. > > 2. LogAppendTime should be used for broker policy enforcement (Log > > retention / rolling) > > 3. It is useful to have a CreateTime in message format, which is > immutable > > after producer sends the message. > > > > There are following questions still in discussion: > > 1. Should we also add LogAppendTime to message format? > > 2. which timestamp should we use to build the index. > > > > Let's talk about question 1 first because question 2 is actually a follow > > up question for question 1. > > Here are what I think: > > 1a. To enforce broker log policy, theoretically we don't need per-message > > LogAppendTime. If we don't include LogAppendTime in message, we still > need > > to implement a separate solution to pass log segment timestamps among > > brokers. That means if we don't include the LogAppendTime in message, > there > > will be further complication in replication. > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > > comparison) > > 1c. We have already exposed offset, which is essentially an internal > > concept of message in terms of position. Exposing LogAppendTime means we > > expose another internal concept of message in terms of time. > > > > Considering the above reasons, personally I think it worth adding the > > LogAppendTime to each message. > > > > Any thoughts? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin > wrote: > > > > > I was trying to send last email before KIP hangout so maybe did not > think > > > it through completely. By the way, the discussion is actually more > > related > > > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. > > > (Although it seems all the discussion are still in this mailing > > thread...) > > > This solution in last email is for indexing on CreateTime. It is > > > essentially what Jay suggested except we use a timestamp map instead > of a > > > memory mapped index file. Please ignore the proposal of using a log > > > compacted topic. The solution can be simplified to: > > > > > > Each broker keeps > > > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, > > > Offset]]. The timestamp is on minute boundary. > > > 2. A timestamp index file for each segment. > > > When a broker receives a message (both leader or follower), it checks > if > > > the timestamp index map contains the timestamp for current segment. The > > > broker add the offset to the map and append an entry to the timestamp > > index > > > if the timestamp does not exist. i.e. we only use the index file as a > > > persistent copy of the index timestamp map. > > > > > > When a log segment is deleted, we need to: > > > 1. delete the TopicPartitionKeySegment key in the timestamp index map. > > > 2. delete the timestamp index file > > > > > > This solution assumes we only keep CreateTime in the message. There > are a > > > few trade-offs in this solution: > > > 1. The granularity of search will be per minute. > > > 2. All the timestamp index map has to be in the memory all the time. > > > 3. We need to think about another way to honor log retention time and > > > time-based log rolling. > > > 4. We lose the benefit brought by including LogAppendTime in the > message > > > mentioned earlier. > > > > > > I am not sure whether this solution is necessarily better than indexing > > on > > > LogAppendTime. > > > > > > I will update KIP-33 to explain the solution to index on CreateTime and > > > LogAppendTime respectively and put some more concrete use cases as > well. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin > wrote: > > > > > >> Hi Joel, > > >> > > >> Good point about rebuilding index. I agree that having a per message > > >> LogAppendTime might be necessary. About time adjustment, the solution > > >> sounds promising, but it might be better to make it as a follow up of > > the > > >> KIP because it seems a really rare use case. > > >> > > >> I have another thought on how
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi folks, Thanks a lot for the feedback on KIP-31 - move to use relative offset. (Not including timestamp and index discussion). I updated the migration plan section as we discussed on KIP hangout. I think it is the only concern raised so far. Please let me know if there are further comments about the KIP. Thanks, Jiangjie (Becket) Qin On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin wrote: > I just updated the KIP-33 to explain the indexing on CreateTime and > LogAppendTime respectively. I also used some use case to compare the two > solutions. > Although this is for KIP-33, but it does give a some insights on whether > it makes sense to have a per message LogAppendTime. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > As a short summary of the conclusions we have already reached on timestamp: > 1. It is good to add a timestamp to the message. > 2. LogAppendTime should be used for broker policy enforcement (Log > retention / rolling) > 3. It is useful to have a CreateTime in message format, which is immutable > after producer sends the message. > > There are following questions still in discussion: > 1. Should we also add LogAppendTime to message format? > 2. which timestamp should we use to build the index. > > Let's talk about question 1 first because question 2 is actually a follow > up question for question 1. > Here are what I think: > 1a. To enforce broker log policy, theoretically we don't need per-message > LogAppendTime. If we don't include LogAppendTime in message, we still need > to implement a separate solution to pass log segment timestamps among > brokers. That means if we don't include the LogAppendTime in message, there > will be further complication in replication. > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > comparison) > 1c. We have already exposed offset, which is essentially an internal > concept of message in terms of position. Exposing LogAppendTime means we > expose another internal concept of message in terms of time. > > Considering the above reasons, personally I think it worth adding the > LogAppendTime to each message. > > Any thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin wrote: > >> I was trying to send last email before KIP hangout so maybe did not think >> it through completely. By the way, the discussion is actually more related >> to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. >> (Although it seems all the discussion are still in this mailing thread...) >> This solution in last email is for indexing on CreateTime. It is >> essentially what Jay suggested except we use a timestamp map instead of a >> memory mapped index file. Please ignore the proposal of using a log >> compacted topic. The solution can be simplified to: >> >> Each broker keeps >> 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, >> Offset]]. The timestamp is on minute boundary. >> 2. A timestamp index file for each segment. >> When a broker receives a message (both leader or follower), it checks if >> the timestamp index map contains the timestamp for current segment. The >> broker add the offset to the map and append an entry to the timestamp index >> if the timestamp does not exist. i.e. we only use the index file as a >> persistent copy of the index timestamp map. >> >> When a log segment is deleted, we need to: >> 1. delete the TopicPartitionKeySegment key in the timestamp index map. >> 2. delete the timestamp index file >> >> This solution assumes we only keep CreateTime in the message. There are a >> few trade-offs in this solution: >> 1. The granularity of search will be per minute. >> 2. All the timestamp index map has to be in the memory all the time. >> 3. We need to think about another way to honor log retention time and >> time-based log rolling. >> 4. We lose the benefit brought by including LogAppendTime in the message >> mentioned earlier. >> >> I am not sure whether this solution is necessarily better than indexing >> on LogAppendTime. >> >> I will update KIP-33 to explain the solution to index on CreateTime and >> LogAppendTime respectively and put some more concrete use cases as well. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin wrote: >> >>> Hi Joel, >>> >>> Good point about rebuilding index. I agree that having a per message >>> LogAppendTime might be necessary. About time adjustment, the solution >>> sounds promising, but it might be better to make it as a follow up of the >>> KIP because it seems a really rare use case. >>> >>> I have another thought on how to manage the out of order timestamps. >>> Maybe we can do the following: >>> Create a special log compacted topic __timestamp_index similar to topic, >>> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value >>> is offset. In memory, we keep a map for each TopicPartition, the value is
Re: [DISCUSS] KIP-31 - Message format change proposal
Jiangjie, Thanks for the writeup. A few comments below. 1. We will need to be a bit careful with fetch requests from the followers. Basically, as we are doing a rolling upgrade of the brokers, the follower can't start issuing V2 of the fetch request until the rest of the brokers are ready to process it. So, we probably need to make use of inter.broker.protocol.version to do the rolling upgrade. In step 1, we set inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of the brokers. At this point, all brokers are capable of processing V2 of fetch requests, but no broker is using it yet. In step 2, we set inter.broker.protocol.version to 0.10 and do another round of rolling restart of the brokers. In this step, the upgraded brokers will start issuing V2 of the fetch request. 2. If we do #1, I am not sure if there is still a need for message.format.version since the broker can start writing messages in the new format after inter.broker.protocol.version is set to 0.10. 3. It wasn't clear from the wiki whether the base offset in the shallow message is the offset of the first or the last inner message. It's better to use the offset of the last inner message. This way, the followers don't have to decompress messages to figure out the next fetch offset. 4. I am not sure that I understand the following sentence in the wiki. It seems that the relative offsets in a compressed message don't have to be consecutive. If so, why do we need to update the relative offsets in the inner messages? "When the log cleaner compacts log segments, it needs to update the inner message's relative offset values." Thanks, Jun On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin wrote: > Hi folks, > > Thanks a lot for the feedback on KIP-31 - move to use relative offset. (Not > including timestamp and index discussion). > > I updated the migration plan section as we discussed on KIP hangout. I > think it is the only concern raised so far. Please let me know if there are > further comments about the KIP. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin wrote: > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > LogAppendTime respectively. I also used some use case to compare the two > > solutions. > > Although this is for KIP-33, but it does give a some insights on whether > > it makes sense to have a per message LogAppendTime. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > As a short summary of the conclusions we have already reached on > timestamp: > > 1. It is good to add a timestamp to the message. > > 2. LogAppendTime should be used for broker policy enforcement (Log > > retention / rolling) > > 3. It is useful to have a CreateTime in message format, which is > immutable > > after producer sends the message. > > > > There are following questions still in discussion: > > 1. Should we also add LogAppendTime to message format? > > 2. which timestamp should we use to build the index. > > > > Let's talk about question 1 first because question 2 is actually a follow > > up question for question 1. > > Here are what I think: > > 1a. To enforce broker log policy, theoretically we don't need per-message > > LogAppendTime. If we don't include LogAppendTime in message, we still > need > > to implement a separate solution to pass log segment timestamps among > > brokers. That means if we don't include the LogAppendTime in message, > there > > will be further complication in replication. > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > > comparison) > > 1c. We have already exposed offset, which is essentially an internal > > concept of message in terms of position. Exposing LogAppendTime means we > > expose another internal concept of message in terms of time. > > > > Considering the above reasons, personally I think it worth adding the > > LogAppendTime to each message. > > > > Any thoughts? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin > wrote: > > > >> I was trying to send last email before KIP hangout so maybe did not > think > >> it through completely. By the way, the discussion is actually more > related > >> to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. > >> (Although it seems all the discussion are still in this mailing > thread...) > >> This solution in last email is for indexing on CreateTime. It is > >> essentially what Jay suggested except we use a timestamp map instead of > a > >> memory mapped index file. Please ignore the proposal of using a log > >> compacted topic. The solution can be simplified to: > >> > >> Each broker keeps > >> 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, > >> Offset]]. The timestamp is on minute boundary. > >> 2. A timestamp index file for each segment. > >> When a broker receives a message (both leader or follower), it checks if > >> t
Re: [DISCUSS] KIP-31 - Message format change proposal
For (3) I don't think we can change the offset in the outer message from what it is today as it is relied upon in the search done in the log layer. The reason it is the offset of the last message rather than the first is to make the offset a least upper bound (i.e. the smallest offset >= fetch_offset). This needs to work the same for both gaps due to compacted topics and gaps due to compressed messages. So imagine you had a compressed set with offsets {45, 46, 47, 48} if you assigned this compressed set the offset 45 a fetch for 46 would actually skip ahead to 49 (the least upper bound). -Jay On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao wrote: > Jiangjie, > > Thanks for the writeup. A few comments below. > > 1. We will need to be a bit careful with fetch requests from the followers. > Basically, as we are doing a rolling upgrade of the brokers, the follower > can't start issuing V2 of the fetch request until the rest of the brokers > are ready to process it. So, we probably need to make use of > inter.broker.protocol.version to do the rolling upgrade. In step 1, we set > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of > the brokers. At this point, all brokers are capable of processing V2 of > fetch requests, but no broker is using it yet. In step 2, we > set inter.broker.protocol.version to 0.10 and do another round of rolling > restart of the brokers. In this step, the upgraded brokers will start > issuing V2 of the fetch request. > > 2. If we do #1, I am not sure if there is still a need for > message.format.version since the broker can start writing messages in the > new format after inter.broker.protocol.version is set to 0.10. > > 3. It wasn't clear from the wiki whether the base offset in the shallow > message is the offset of the first or the last inner message. It's better > to use the offset of the last inner message. This way, the followers don't > have to decompress messages to figure out the next fetch offset. > > 4. I am not sure that I understand the following sentence in the wiki. It > seems that the relative offsets in a compressed message don't have to be > consecutive. If so, why do we need to update the relative offsets in the > inner messages? > "When the log cleaner compacts log segments, it needs to update the inner > message's relative offset values." > > Thanks, > > Jun > > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin > wrote: > > > Hi folks, > > > > Thanks a lot for the feedback on KIP-31 - move to use relative offset. > (Not > > including timestamp and index discussion). > > > > I updated the migration plan section as we discussed on KIP hangout. I > > think it is the only concern raised so far. Please let me know if there > are > > further comments about the KIP. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin wrote: > > > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > > LogAppendTime respectively. I also used some use case to compare the > two > > > solutions. > > > Although this is for KIP-33, but it does give a some insights on > whether > > > it makes sense to have a per message LogAppendTime. > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > > > As a short summary of the conclusions we have already reached on > > timestamp: > > > 1. It is good to add a timestamp to the message. > > > 2. LogAppendTime should be used for broker policy enforcement (Log > > > retention / rolling) > > > 3. It is useful to have a CreateTime in message format, which is > > immutable > > > after producer sends the message. > > > > > > There are following questions still in discussion: > > > 1. Should we also add LogAppendTime to message format? > > > 2. which timestamp should we use to build the index. > > > > > > Let's talk about question 1 first because question 2 is actually a > follow > > > up question for question 1. > > > Here are what I think: > > > 1a. To enforce broker log policy, theoretically we don't need > per-message > > > LogAppendTime. If we don't include LogAppendTime in message, we still > > need > > > to implement a separate solution to pass log segment timestamps among > > > brokers. That means if we don't include the LogAppendTime in message, > > there > > > will be further complication in replication. > > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > > > comparison) > > > 1c. We have already exposed offset, which is essentially an internal > > > concept of message in terms of position. Exposing LogAppendTime means > we > > > expose another internal concept of message in terms of time. > > > > > > Considering the above reasons, personally I think it worth adding the > > > LogAppendTime to each message. > > > > > > Any thoughts? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin > > wrote: > > > > > >> I was trying to
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Jun, Thanks a lot for the comments. Please see the inline reply. I will update the KIP page accordingly. On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao wrote: > Jiangjie, > > Thanks for the writeup. A few comments below. > > 1. We will need to be a bit careful with fetch requests from the followers. > Basically, as we are doing a rolling upgrade of the brokers, the follower > can't start issuing V2 of the fetch request until the rest of the brokers > are ready to process it. So, we probably need to make use of > inter.broker.protocol.version to do the rolling upgrade. In step 1, we set > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of > the brokers. At this point, all brokers are capable of processing V2 of > fetch requests, but no broker is using it yet. In step 2, we > set inter.broker.protocol.version to 0.10 and do another round of rolling > restart of the brokers. In this step, the upgraded brokers will start > issuing V2 of the fetch request. > Thanks for reminding. Yes, we should use inter broker protocol in this case. A related thought about the mapping between inter broker protocol and the release version. Today the inter broker protocol and release version is one-to-one mapping. If we use 0.10 for the new protocol now(with FetchRequest V2) and later on we need to bump up inter protocol version(e.g. FetchRequest V3) before 0.10 officially release again, should we still use version 0.10 but point it to FetchRequest V3? If we do that, inter broker protocol 0.10 with FetchRequest V2 will be as if never existed. The users who are running on inter broker protocol 0.10 with FetchRequest V2 (not official release but trunk at that time) will have to downgrade to use inter broker protocol 0.9 before they upgrade to inter broker protocol 0.10 with FetchRequest V3. If we let multiple inter broker protocol changes between two official releases to associate with last official build, this might solve the problem. For example, this time we can use "0.9-V1", if there are further inter broker protocol change we can use "0.9-V2" and so on. This will avoid losing version of inter broker protocol changes between official releases. > > 2. If we do #1, I am not sure if there is still a need for > message.format.version since the broker can start writing messages in the > new format after inter.broker.protocol.version is set to 0.10. > Right, I don't think we need this anymore. > 3. It wasn't clear from the wiki whether the base offset in the shallow > message is the offset of the first or the last inner message. It's better > to use the offset of the last inner message. This way, the followers don't > have to decompress messages to figure out the next fetch offset. > Good point. Last offset is better, which is also what we are doing now. I'll update the KIP page. > > 4. I am not sure that I understand the following sentence in the wiki. It > seems that the relative offsets in a compressed message don't have to be > consecutive. If so, why do we need to update the relative offsets in the > inner messages? > "When the log cleaner compacts log segments, it needs to update the inner > message's relative offset values." > This is assuming that when we compact log segments, we might compact multiple message sets into one message set. If so, the relative offset in original message set is likely different from the relative offset in the compacted message set. That's why we need to update the inner message relative offset values. > > Thanks, > > Jun > > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin > wrote: > > > Hi folks, > > > > Thanks a lot for the feedback on KIP-31 - move to use relative offset. > (Not > > including timestamp and index discussion). > > > > I updated the migration plan section as we discussed on KIP hangout. I > > think it is the only concern raised so far. Please let me know if there > are > > further comments about the KIP. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin wrote: > > > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > > LogAppendTime respectively. I also used some use case to compare the > two > > > solutions. > > > Although this is for KIP-33, but it does give a some insights on > whether > > > it makes sense to have a per message LogAppendTime. > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > > > As a short summary of the conclusions we have already reached on > > timestamp: > > > 1. It is good to add a timestamp to the message. > > > 2. LogAppendTime should be used for broker policy enforcement (Log > > > retention / rolling) > > > 3. It is useful to have a CreateTime in message format, which is > > immutable > > > after producer sends the message. > > > > > > There are following questions still in discussion: > > > 1. Should we also add LogAppendTime to message format? > > > 2. which timestamp should we use to
Re: [DISCUSS] KIP-31 - Message format change proposal
Thanks for the explanation, Jay. Agreed. We have to keep the offset to be the offset of last inner message. Jiangjie (Becket) Qin On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps wrote: > For (3) I don't think we can change the offset in the outer message from > what it is today as it is relied upon in the search done in the log layer. > The reason it is the offset of the last message rather than the first is to > make the offset a least upper bound (i.e. the smallest offset >= > fetch_offset). This needs to work the same for both gaps due to compacted > topics and gaps due to compressed messages. > > So imagine you had a compressed set with offsets {45, 46, 47, 48} if you > assigned this compressed set the offset 45 a fetch for 46 would actually > skip ahead to 49 (the least upper bound). > > -Jay > > On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao wrote: > > > Jiangjie, > > > > Thanks for the writeup. A few comments below. > > > > 1. We will need to be a bit careful with fetch requests from the > followers. > > Basically, as we are doing a rolling upgrade of the brokers, the follower > > can't start issuing V2 of the fetch request until the rest of the brokers > > are ready to process it. So, we probably need to make use of > > inter.broker.protocol.version to do the rolling upgrade. In step 1, we > set > > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of > > the brokers. At this point, all brokers are capable of processing V2 of > > fetch requests, but no broker is using it yet. In step 2, we > > set inter.broker.protocol.version to 0.10 and do another round of rolling > > restart of the brokers. In this step, the upgraded brokers will start > > issuing V2 of the fetch request. > > > > 2. If we do #1, I am not sure if there is still a need for > > message.format.version since the broker can start writing messages in the > > new format after inter.broker.protocol.version is set to 0.10. > > > > 3. It wasn't clear from the wiki whether the base offset in the shallow > > message is the offset of the first or the last inner message. It's better > > to use the offset of the last inner message. This way, the followers > don't > > have to decompress messages to figure out the next fetch offset. > > > > 4. I am not sure that I understand the following sentence in the wiki. It > > seems that the relative offsets in a compressed message don't have to be > > consecutive. If so, why do we need to update the relative offsets in the > > inner messages? > > "When the log cleaner compacts log segments, it needs to update the inner > > message's relative offset values." > > > > Thanks, > > > > Jun > > > > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin > > > wrote: > > > > > Hi folks, > > > > > > Thanks a lot for the feedback on KIP-31 - move to use relative offset. > > (Not > > > including timestamp and index discussion). > > > > > > I updated the migration plan section as we discussed on KIP hangout. I > > > think it is the only concern raised so far. Please let me know if there > > are > > > further comments about the KIP. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin > wrote: > > > > > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > > > LogAppendTime respectively. I also used some use case to compare the > > two > > > > solutions. > > > > Although this is for KIP-33, but it does give a some insights on > > whether > > > > it makes sense to have a per message LogAppendTime. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > > > > > As a short summary of the conclusions we have already reached on > > > timestamp: > > > > 1. It is good to add a timestamp to the message. > > > > 2. LogAppendTime should be used for broker policy enforcement (Log > > > > retention / rolling) > > > > 3. It is useful to have a CreateTime in message format, which is > > > immutable > > > > after producer sends the message. > > > > > > > > There are following questions still in discussion: > > > > 1. Should we also add LogAppendTime to message format? > > > > 2. which timestamp should we use to build the index. > > > > > > > > Let's talk about question 1 first because question 2 is actually a > > follow > > > > up question for question 1. > > > > Here are what I think: > > > > 1a. To enforce broker log policy, theoretically we don't need > > per-message > > > > LogAppendTime. If we don't include LogAppendTime in message, we still > > > need > > > > to implement a separate solution to pass log segment timestamps among > > > > brokers. That means if we don't include the LogAppendTime in message, > > > there > > > > will be further complication in replication. > > > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has > detail > > > > comparison) > > > > 1c. We have already exposed offset, which is essentially an internal > > > > concept of message in
Re: [DISCUSS] KIP-31 - Message format change proposal
The upgrade plan works, but the potentially long interim phase of skipping zero-copy for down-conversion could be problematic especially for large deployments with large consumer fan-out. It is not only going to be memory overhead but CPU as well - since you need to decompress, write absolute offsets, then recompress for every v1 fetch. i.e., it may be safer (but obviously more tedious) to have a multi-step upgrade process. For e.g.,: 1 - Upgrade brokers, but disable the feature. i.e., either reject producer requests v2 or down-convert to old message format (with absolute offsets) 2 - Upgrade clients, but they should only use v1 requests 3 - Switch (all or most) consumers to use v2 fetch format (which will use zero-copy). 4 - Turn on the feature on the brokers to allow producer requests v2 5 - Switch producers to use v2 produce format (You may want a v1 fetch rate metric and decide to proceed to step 4 only when that comes down to a trickle) I'm not sure if the prolonged upgrade process is viable in every scenario. I think it should work at LinkedIn for e.g., but may not for other environments. Joel On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin wrote: > Thanks for the explanation, Jay. > Agreed. We have to keep the offset to be the offset of last inner message. > > Jiangjie (Becket) Qin > > On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps wrote: > >> For (3) I don't think we can change the offset in the outer message from >> what it is today as it is relied upon in the search done in the log layer. >> The reason it is the offset of the last message rather than the first is to >> make the offset a least upper bound (i.e. the smallest offset >= >> fetch_offset). This needs to work the same for both gaps due to compacted >> topics and gaps due to compressed messages. >> >> So imagine you had a compressed set with offsets {45, 46, 47, 48} if you >> assigned this compressed set the offset 45 a fetch for 46 would actually >> skip ahead to 49 (the least upper bound). >> >> -Jay >> >> On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao wrote: >> >> > Jiangjie, >> > >> > Thanks for the writeup. A few comments below. >> > >> > 1. We will need to be a bit careful with fetch requests from the >> followers. >> > Basically, as we are doing a rolling upgrade of the brokers, the follower >> > can't start issuing V2 of the fetch request until the rest of the brokers >> > are ready to process it. So, we probably need to make use of >> > inter.broker.protocol.version to do the rolling upgrade. In step 1, we >> set >> > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of >> > the brokers. At this point, all brokers are capable of processing V2 of >> > fetch requests, but no broker is using it yet. In step 2, we >> > set inter.broker.protocol.version to 0.10 and do another round of rolling >> > restart of the brokers. In this step, the upgraded brokers will start >> > issuing V2 of the fetch request. >> > >> > 2. If we do #1, I am not sure if there is still a need for >> > message.format.version since the broker can start writing messages in the >> > new format after inter.broker.protocol.version is set to 0.10. >> > >> > 3. It wasn't clear from the wiki whether the base offset in the shallow >> > message is the offset of the first or the last inner message. It's better >> > to use the offset of the last inner message. This way, the followers >> don't >> > have to decompress messages to figure out the next fetch offset. >> > >> > 4. I am not sure that I understand the following sentence in the wiki. It >> > seems that the relative offsets in a compressed message don't have to be >> > consecutive. If so, why do we need to update the relative offsets in the >> > inner messages? >> > "When the log cleaner compacts log segments, it needs to update the inner >> > message's relative offset values." >> > >> > Thanks, >> > >> > Jun >> > >> > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin > > >> > wrote: >> > >> > > Hi folks, >> > > >> > > Thanks a lot for the feedback on KIP-31 - move to use relative offset. >> > (Not >> > > including timestamp and index discussion). >> > > >> > > I updated the migration plan section as we discussed on KIP hangout. I >> > > think it is the only concern raised so far. Please let me know if there >> > are >> > > further comments about the KIP. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin >> wrote: >> > > >> > > > I just updated the KIP-33 to explain the indexing on CreateTime and >> > > > LogAppendTime respectively. I also used some use case to compare the >> > two >> > > > solutions. >> > > > Although this is for KIP-33, but it does give a some insights on >> > whether >> > > > it makes sense to have a per message LogAppendTime. >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index >> > > > >> > > > As a short summary of the conclusions we have alread
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Joel, That is a valid concern. And that is actually why we had the message.format.version before. My original thinking was: 1. upgrade the broker to support both V1 and V2 for consumer/producer request. 2. configure broker to store V1 on the disk. (message.format.version = 1) 3. upgrade the consumer to support both V1 and V2 for consumer request. 4. Meanwhile some producer might also be upgraded to use producer request V2. 5. At this point, for producer request V2, broker will do down conversion. Regardless consumers are upgraded or not, broker will always use zero-copy transfer. Because supposedly both old and upgraded consumer should be able to understand that. 6. After most of the consumers are upgraded, We set message.format.version = 1 and only do down conversion for old consumers. This way we don't need to reject producer request V2. And we always to version conversion for the minority of the consumers. However I have a few concerns over this approach, not sure if they actually matters. A. (5) is not true for now. Today the clients only uses the highest version, i.e. a producer/consumer wouldn't parse a lower version of response even the code exist there. I think supposedly, consumer should stick to one version and broker should do the conversion. B. Let's say (A) is not a concern, we make all the clients support all the versions it knows. At step(6), there will be a transitional period that user will see both messages with new and old version. For KIP-31 only it might be OK because we are not adding anything into the message. But if the message has different fields (e.g. KIP-32), that means people will get those fields from some messages but not from some other messages. Would that be a problem? If (A) and (B) are not a problem. Is the above procedure able to address your concern? Thanks, Jiangjie (Becket) Qin On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy wrote: > The upgrade plan works, but the potentially long interim phase of > skipping zero-copy for down-conversion could be problematic especially > for large deployments with large consumer fan-out. It is not only > going to be memory overhead but CPU as well - since you need to > decompress, write absolute offsets, then recompress for every v1 > fetch. i.e., it may be safer (but obviously more tedious) to have a > multi-step upgrade process. For e.g.,: > > 1 - Upgrade brokers, but disable the feature. i.e., either reject > producer requests v2 or down-convert to old message format (with > absolute offsets) > 2 - Upgrade clients, but they should only use v1 requests > 3 - Switch (all or most) consumers to use v2 fetch format (which will > use zero-copy). > 4 - Turn on the feature on the brokers to allow producer requests v2 > 5 - Switch producers to use v2 produce format > > (You may want a v1 fetch rate metric and decide to proceed to step 4 > only when that comes down to a trickle) > > I'm not sure if the prolonged upgrade process is viable in every > scenario. I think it should work at LinkedIn for e.g., but may not for > other environments. > > Joel > > > On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin > wrote: > > Thanks for the explanation, Jay. > > Agreed. We have to keep the offset to be the offset of last inner > message. > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps wrote: > > > >> For (3) I don't think we can change the offset in the outer message from > >> what it is today as it is relied upon in the search done in the log > layer. > >> The reason it is the offset of the last message rather than the first > is to > >> make the offset a least upper bound (i.e. the smallest offset >= > >> fetch_offset). This needs to work the same for both gaps due to > compacted > >> topics and gaps due to compressed messages. > >> > >> So imagine you had a compressed set with offsets {45, 46, 47, 48} if you > >> assigned this compressed set the offset 45 a fetch for 46 would actually > >> skip ahead to 49 (the least upper bound). > >> > >> -Jay > >> > >> On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao wrote: > >> > >> > Jiangjie, > >> > > >> > Thanks for the writeup. A few comments below. > >> > > >> > 1. We will need to be a bit careful with fetch requests from the > >> followers. > >> > Basically, as we are doing a rolling upgrade of the brokers, the > follower > >> > can't start issuing V2 of the fetch request until the rest of the > brokers > >> > are ready to process it. So, we probably need to make use of > >> > inter.broker.protocol.version to do the rolling upgrade. In step 1, we > >> set > >> > inter.broker.protocol.version to 0.9 and do a round of rolling > upgrade of > >> > the brokers. At this point, all brokers are capable of processing V2 > of > >> > fetch requests, but no broker is using it yet. In step 2, we > >> > set inter.broker.protocol.version to 0.10 and do another round of > rolling > >> > restart of the brokers. In this step, the upgraded brokers will start > >> > issuing V2 of the fetch
Re: [DISCUSS] KIP-31 - Message format change proposal
Hey Becket, I do think we need the interim deployment phase, set message.format.version and down-convert for producer request v2. Down-conversion for v2 is no worse than what the broker is doing now. I don't think we want a prolonged phase where we down-convert for every v1 fetch - in fact I'm less concerned about losing zero-copy for those fetch requests than the overhead of decompress/recompress for those fetches as that would increase your CPU usage by 4x, 5x or whatever the average consumer fan-out is. The decompression/recompression will put further memory pressure as well. It is true that clients send the latest request version that it is compiled with and that does not need to change. The broker can continue to send back with zero-copy for fetch request version 2 as well (even if during the interim phase during which it down-converts producer request v2). The consumer iterator (for old consumer) or the Fetcher (for new consumer) needs to be able to handle messages that are in original as well as new (relative offset) format. Thanks, Joel On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin wrote: > Hi Joel, > > That is a valid concern. And that is actually why we had the > message.format.version before. > > My original thinking was: > 1. upgrade the broker to support both V1 and V2 for consumer/producer > request. > 2. configure broker to store V1 on the disk. (message.format.version = 1) > 3. upgrade the consumer to support both V1 and V2 for consumer request. > 4. Meanwhile some producer might also be upgraded to use producer request > V2. > 5. At this point, for producer request V2, broker will do down conversion. > Regardless consumers are upgraded or not, broker will always use zero-copy > transfer. Because supposedly both old and upgraded consumer should be able > to understand that. > 6. After most of the consumers are upgraded, We set message.format.version > = 1 and only do down conversion for old consumers. > > This way we don't need to reject producer request V2. And we always to > version conversion for the minority of the consumers. However I have a few > concerns over this approach, not sure if they actually matters. > > A. (5) is not true for now. Today the clients only uses the highest > version, i.e. a producer/consumer wouldn't parse a lower version of > response even the code exist there. I think supposedly, consumer should > stick to one version and broker should do the conversion. > B. Let's say (A) is not a concern, we make all the clients support all the > versions it knows. At step(6), there will be a transitional period that > user will see both messages with new and old version. For KIP-31 only it > might be OK because we are not adding anything into the message. But if the > message has different fields (e.g. KIP-32), that means people will get > those fields from some messages but not from some other messages. Would > that be a problem? > > If (A) and (B) are not a problem. Is the above procedure able to address > your concern? > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy wrote: > >> The upgrade plan works, but the potentially long interim phase of >> skipping zero-copy for down-conversion could be problematic especially >> for large deployments with large consumer fan-out. It is not only >> going to be memory overhead but CPU as well - since you need to >> decompress, write absolute offsets, then recompress for every v1 >> fetch. i.e., it may be safer (but obviously more tedious) to have a >> multi-step upgrade process. For e.g.,: >> >> 1 - Upgrade brokers, but disable the feature. i.e., either reject >> producer requests v2 or down-convert to old message format (with >> absolute offsets) >> 2 - Upgrade clients, but they should only use v1 requests >> 3 - Switch (all or most) consumers to use v2 fetch format (which will >> use zero-copy). >> 4 - Turn on the feature on the brokers to allow producer requests v2 >> 5 - Switch producers to use v2 produce format >> >> (You may want a v1 fetch rate metric and decide to proceed to step 4 >> only when that comes down to a trickle) >> >> I'm not sure if the prolonged upgrade process is viable in every >> scenario. I think it should work at LinkedIn for e.g., but may not for >> other environments. >> >> Joel >> >> >> On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin >> wrote: >> > Thanks for the explanation, Jay. >> > Agreed. We have to keep the offset to be the offset of last inner >> message. >> > >> > Jiangjie (Becket) Qin >> > >> > On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps wrote: >> > >> >> For (3) I don't think we can change the offset in the outer message from >> >> what it is today as it is relied upon in the search done in the log >> layer. >> >> The reason it is the offset of the last message rather than the first >> is to >> >> make the offset a least upper bound (i.e. the smallest offset >= >> >> fetch_offset). This needs to work the same for both gaps due to >> compacted >> >>
Re: [DISCUSS] KIP-31 - Message format change proposal
Hi Joel and other folks. I updated the KIP page with the two phase roll out, which avoids the conversion for majority of users. To do that we need to add a message.format.version configuration to broker. Other than that there is no interface change from the previous proposal. Please let me know if you have concern about the updated proposal. Thanks, Jiangjie (Becket) Qin On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy wrote: > Hey Becket, > > I do think we need the interim deployment phase, set > message.format.version and down-convert for producer request v2. > Down-conversion for v2 is no worse than what the broker is doing now. > I don't think we want a prolonged phase where we down-convert for > every v1 fetch - in fact I'm less concerned about losing zero-copy for > those fetch requests than the overhead of decompress/recompress for > those fetches as that would increase your CPU usage by 4x, 5x or > whatever the average consumer fan-out is. The > decompression/recompression will put further memory pressure as well. > > It is true that clients send the latest request version that it is > compiled with and that does not need to change. The broker can > continue to send back with zero-copy for fetch request version 2 as > well (even if during the interim phase during which it down-converts > producer request v2). The consumer iterator (for old consumer) or the > Fetcher (for new consumer) needs to be able to handle messages that > are in original as well as new (relative offset) format. > > Thanks, > > Joel > > > On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin > wrote: > > Hi Joel, > > > > That is a valid concern. And that is actually why we had the > > message.format.version before. > > > > My original thinking was: > > 1. upgrade the broker to support both V1 and V2 for consumer/producer > > request. > > 2. configure broker to store V1 on the disk. (message.format.version = 1) > > 3. upgrade the consumer to support both V1 and V2 for consumer request. > > 4. Meanwhile some producer might also be upgraded to use producer request > > V2. > > 5. At this point, for producer request V2, broker will do down > conversion. > > Regardless consumers are upgraded or not, broker will always use > zero-copy > > transfer. Because supposedly both old and upgraded consumer should be > able > > to understand that. > > 6. After most of the consumers are upgraded, We set > message.format.version > > = 1 and only do down conversion for old consumers. > > > > This way we don't need to reject producer request V2. And we always to > > version conversion for the minority of the consumers. However I have a > few > > concerns over this approach, not sure if they actually matters. > > > > A. (5) is not true for now. Today the clients only uses the highest > > version, i.e. a producer/consumer wouldn't parse a lower version of > > response even the code exist there. I think supposedly, consumer should > > stick to one version and broker should do the conversion. > > B. Let's say (A) is not a concern, we make all the clients support all > the > > versions it knows. At step(6), there will be a transitional period that > > user will see both messages with new and old version. For KIP-31 only it > > might be OK because we are not adding anything into the message. But if > the > > message has different fields (e.g. KIP-32), that means people will get > > those fields from some messages but not from some other messages. Would > > that be a problem? > > > > If (A) and (B) are not a problem. Is the above procedure able to address > > your concern? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy wrote: > > > >> The upgrade plan works, but the potentially long interim phase of > >> skipping zero-copy for down-conversion could be problematic especially > >> for large deployments with large consumer fan-out. It is not only > >> going to be memory overhead but CPU as well - since you need to > >> decompress, write absolute offsets, then recompress for every v1 > >> fetch. i.e., it may be safer (but obviously more tedious) to have a > >> multi-step upgrade process. For e.g.,: > >> > >> 1 - Upgrade brokers, but disable the feature. i.e., either reject > >> producer requests v2 or down-convert to old message format (with > >> absolute offsets) > >> 2 - Upgrade clients, but they should only use v1 requests > >> 3 - Switch (all or most) consumers to use v2 fetch format (which will > >> use zero-copy). > >> 4 - Turn on the feature on the brokers to allow producer requests v2 > >> 5 - Switch producers to use v2 produce format > >> > >> (You may want a v1 fetch rate metric and decide to proceed to step 4 > >> only when that comes down to a trickle) > >> > >> I'm not sure if the prolonged upgrade process is viable in every > >> scenario. I think it should work at LinkedIn for e.g., but may not for > >> other environments. > >> > >> Joel > >> > >> > >> On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin
Re: [DISCUSS] KIP-31 - Message format change proposal
The BrokerMetadataRequest API could be used to propagate supported message.formats from broker to client. This is currently discussed in the KIP-35 thread. /Magnus 2015-09-30 1:50 GMT+02:00 Jiangjie Qin : > Hi Joel and other folks. > > I updated the KIP page with the two phase roll out, which avoids the > conversion for majority of users. > > To do that we need to add a message.format.version configuration to broker. > Other than that there is no interface change from the previous proposal. > Please let me know if you have concern about the updated proposal. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy wrote: > > > Hey Becket, > > > > I do think we need the interim deployment phase, set > > message.format.version and down-convert for producer request v2. > > Down-conversion for v2 is no worse than what the broker is doing now. > > I don't think we want a prolonged phase where we down-convert for > > every v1 fetch - in fact I'm less concerned about losing zero-copy for > > those fetch requests than the overhead of decompress/recompress for > > those fetches as that would increase your CPU usage by 4x, 5x or > > whatever the average consumer fan-out is. The > > decompression/recompression will put further memory pressure as well. > > > > It is true that clients send the latest request version that it is > > compiled with and that does not need to change. The broker can > > continue to send back with zero-copy for fetch request version 2 as > > well (even if during the interim phase during which it down-converts > > producer request v2). The consumer iterator (for old consumer) or the > > Fetcher (for new consumer) needs to be able to handle messages that > > are in original as well as new (relative offset) format. > > > > Thanks, > > > > Joel > > > > > > On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin > > > wrote: > > > Hi Joel, > > > > > > That is a valid concern. And that is actually why we had the > > > message.format.version before. > > > > > > My original thinking was: > > > 1. upgrade the broker to support both V1 and V2 for consumer/producer > > > request. > > > 2. configure broker to store V1 on the disk. (message.format.version = > 1) > > > 3. upgrade the consumer to support both V1 and V2 for consumer request. > > > 4. Meanwhile some producer might also be upgraded to use producer > request > > > V2. > > > 5. At this point, for producer request V2, broker will do down > > conversion. > > > Regardless consumers are upgraded or not, broker will always use > > zero-copy > > > transfer. Because supposedly both old and upgraded consumer should be > > able > > > to understand that. > > > 6. After most of the consumers are upgraded, We set > > message.format.version > > > = 1 and only do down conversion for old consumers. > > > > > > This way we don't need to reject producer request V2. And we always to > > > version conversion for the minority of the consumers. However I have a > > few > > > concerns over this approach, not sure if they actually matters. > > > > > > A. (5) is not true for now. Today the clients only uses the highest > > > version, i.e. a producer/consumer wouldn't parse a lower version of > > > response even the code exist there. I think supposedly, consumer should > > > stick to one version and broker should do the conversion. > > > B. Let's say (A) is not a concern, we make all the clients support all > > the > > > versions it knows. At step(6), there will be a transitional period that > > > user will see both messages with new and old version. For KIP-31 only > it > > > might be OK because we are not adding anything into the message. But if > > the > > > message has different fields (e.g. KIP-32), that means people will get > > > those fields from some messages but not from some other messages. Would > > > that be a problem? > > > > > > If (A) and (B) are not a problem. Is the above procedure able to > address > > > your concern? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy > wrote: > > > > > >> The upgrade plan works, but the potentially long interim phase of > > >> skipping zero-copy for down-conversion could be problematic especially > > >> for large deployments with large consumer fan-out. It is not only > > >> going to be memory overhead but CPU as well - since you need to > > >> decompress, write absolute offsets, then recompress for every v1 > > >> fetch. i.e., it may be safer (but obviously more tedious) to have a > > >> multi-step upgrade process. For e.g.,: > > >> > > >> 1 - Upgrade brokers, but disable the feature. i.e., either reject > > >> producer requests v2 or down-convert to old message format (with > > >> absolute offsets) > > >> 2 - Upgrade clients, but they should only use v1 requests > > >> 3 - Switch (all or most) consumers to use v2 fetch format (which will > > >> use zero-copy). > > >> 4 - Turn on the feature on the brokers to allow producer request
Re: [DISCUSS] KIP-31 - Message format change proposal
The Phase 2 2.* sub-steps don't seem to be right. Can you look over that carefully? Also, "definitive" - you mean "absolute" i.e., not relative offsets right? One more thing that may be worth mentioning is that it is technically possible to canary the new version format on at most one broker (or multiple if it hosts mutually disjoint partitions). Basically turn on the new message format on one broker, leave it on for an extended period - if we hit some unanticipated bug and something goes terribly wrong with the feature then just kill that broker, switch it to the v0 on-disk format and reseed it from the leaders. Most people may not want to have such a long deployment plan but at least it is an option for those who want to tread very carefully given that it is backwards incompatible. Joel On Tue, Sep 29, 2015 at 4:50 PM, Jiangjie Qin wrote: > Hi Joel and other folks. > > I updated the KIP page with the two phase roll out, which avoids the > conversion for majority of users. > > To do that we need to add a message.format.version configuration to broker. > Other than that there is no interface change from the previous proposal. > Please let me know if you have concern about the updated proposal. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy wrote: > >> Hey Becket, >> >> I do think we need the interim deployment phase, set >> message.format.version and down-convert for producer request v2. >> Down-conversion for v2 is no worse than what the broker is doing now. >> I don't think we want a prolonged phase where we down-convert for >> every v1 fetch - in fact I'm less concerned about losing zero-copy for >> those fetch requests than the overhead of decompress/recompress for >> those fetches as that would increase your CPU usage by 4x, 5x or >> whatever the average consumer fan-out is. The >> decompression/recompression will put further memory pressure as well. >> >> It is true that clients send the latest request version that it is >> compiled with and that does not need to change. The broker can >> continue to send back with zero-copy for fetch request version 2 as >> well (even if during the interim phase during which it down-converts >> producer request v2). The consumer iterator (for old consumer) or the >> Fetcher (for new consumer) needs to be able to handle messages that >> are in original as well as new (relative offset) format. >> >> Thanks, >> >> Joel >> >> >> On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin >> wrote: >> > Hi Joel, >> > >> > That is a valid concern. And that is actually why we had the >> > message.format.version before. >> > >> > My original thinking was: >> > 1. upgrade the broker to support both V1 and V2 for consumer/producer >> > request. >> > 2. configure broker to store V1 on the disk. (message.format.version = 1) >> > 3. upgrade the consumer to support both V1 and V2 for consumer request. >> > 4. Meanwhile some producer might also be upgraded to use producer request >> > V2. >> > 5. At this point, for producer request V2, broker will do down >> conversion. >> > Regardless consumers are upgraded or not, broker will always use >> zero-copy >> > transfer. Because supposedly both old and upgraded consumer should be >> able >> > to understand that. >> > 6. After most of the consumers are upgraded, We set >> message.format.version >> > = 1 and only do down conversion for old consumers. >> > >> > This way we don't need to reject producer request V2. And we always to >> > version conversion for the minority of the consumers. However I have a >> few >> > concerns over this approach, not sure if they actually matters. >> > >> > A. (5) is not true for now. Today the clients only uses the highest >> > version, i.e. a producer/consumer wouldn't parse a lower version of >> > response even the code exist there. I think supposedly, consumer should >> > stick to one version and broker should do the conversion. >> > B. Let's say (A) is not a concern, we make all the clients support all >> the >> > versions it knows. At step(6), there will be a transitional period that >> > user will see both messages with new and old version. For KIP-31 only it >> > might be OK because we are not adding anything into the message. But if >> the >> > message has different fields (e.g. KIP-32), that means people will get >> > those fields from some messages but not from some other messages. Would >> > that be a problem? >> > >> > If (A) and (B) are not a problem. Is the above procedure able to address >> > your concern? >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy wrote: >> > >> >> The upgrade plan works, but the potentially long interim phase of >> >> skipping zero-copy for down-conversion could be problematic especially >> >> for large deployments with large consumer fan-out. It is not only >> >> going to be memory overhead but CPU as well - since you need to >> >> decompress, write absolute offsets, then recomp
Re: [DISCUSS] KIP-31 - Message format change proposal
Joel, Thanks for the comments. I updated the KIP page and added the canary procedure. Thanks, Jiangjie (Becket) Qin On Wed, Sep 30, 2015 at 6:26 PM, Joel Koshy wrote: > The Phase 2 2.* sub-steps don't seem to be right. Can you look over > that carefully? Also, "definitive" - you mean "absolute" i.e., not > relative offsets right? > > One more thing that may be worth mentioning is that it is technically > possible to canary the new version format on at most one broker (or > multiple if it hosts mutually disjoint partitions). Basically turn on > the new message format on one broker, leave it on for an extended > period - if we hit some unanticipated bug and something goes terribly > wrong with the feature then just kill that broker, switch it to the v0 > on-disk format and reseed it from the leaders. Most people may not > want to have such a long deployment plan but at least it is an option > for those who want to tread very carefully given that it is backwards > incompatible. > > Joel > > On Tue, Sep 29, 2015 at 4:50 PM, Jiangjie Qin > wrote: > > Hi Joel and other folks. > > > > I updated the KIP page with the two phase roll out, which avoids the > > conversion for majority of users. > > > > To do that we need to add a message.format.version configuration to > broker. > > Other than that there is no interface change from the previous proposal. > > Please let me know if you have concern about the updated proposal. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy > wrote: > > > >> Hey Becket, > >> > >> I do think we need the interim deployment phase, set > >> message.format.version and down-convert for producer request v2. > >> Down-conversion for v2 is no worse than what the broker is doing now. > >> I don't think we want a prolonged phase where we down-convert for > >> every v1 fetch - in fact I'm less concerned about losing zero-copy for > >> those fetch requests than the overhead of decompress/recompress for > >> those fetches as that would increase your CPU usage by 4x, 5x or > >> whatever the average consumer fan-out is. The > >> decompression/recompression will put further memory pressure as well. > >> > >> It is true that clients send the latest request version that it is > >> compiled with and that does not need to change. The broker can > >> continue to send back with zero-copy for fetch request version 2 as > >> well (even if during the interim phase during which it down-converts > >> producer request v2). The consumer iterator (for old consumer) or the > >> Fetcher (for new consumer) needs to be able to handle messages that > >> are in original as well as new (relative offset) format. > >> > >> Thanks, > >> > >> Joel > >> > >> > >> On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin > > >> wrote: > >> > Hi Joel, > >> > > >> > That is a valid concern. And that is actually why we had the > >> > message.format.version before. > >> > > >> > My original thinking was: > >> > 1. upgrade the broker to support both V1 and V2 for consumer/producer > >> > request. > >> > 2. configure broker to store V1 on the disk. (message.format.version > = 1) > >> > 3. upgrade the consumer to support both V1 and V2 for consumer > request. > >> > 4. Meanwhile some producer might also be upgraded to use producer > request > >> > V2. > >> > 5. At this point, for producer request V2, broker will do down > >> conversion. > >> > Regardless consumers are upgraded or not, broker will always use > >> zero-copy > >> > transfer. Because supposedly both old and upgraded consumer should be > >> able > >> > to understand that. > >> > 6. After most of the consumers are upgraded, We set > >> message.format.version > >> > = 1 and only do down conversion for old consumers. > >> > > >> > This way we don't need to reject producer request V2. And we always to > >> > version conversion for the minority of the consumers. However I have a > >> few > >> > concerns over this approach, not sure if they actually matters. > >> > > >> > A. (5) is not true for now. Today the clients only uses the highest > >> > version, i.e. a producer/consumer wouldn't parse a lower version of > >> > response even the code exist there. I think supposedly, consumer > should > >> > stick to one version and broker should do the conversion. > >> > B. Let's say (A) is not a concern, we make all the clients support all > >> the > >> > versions it knows. At step(6), there will be a transitional period > that > >> > user will see both messages with new and old version. For KIP-31 only > it > >> > might be OK because we are not adding anything into the message. But > if > >> the > >> > message has different fields (e.g. KIP-32), that means people will get > >> > those fields from some messages but not from some other messages. > Would > >> > that be a problem? > >> > > >> > If (A) and (B) are not a problem. Is the above procedure able to > address > >> > your concern? > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin