One more comment, I think there are really three proposals here: 1. Get a mechanism and policy in place for record format upgrade (we haven't done this so we don't really have the infra). This is kind of implicit. I suspect we'll need to do this multiple times in the future so we should make it easy. 2. Add a timestamp to messages. 3. Move to relative offsets
For sanity it might make sense to discuss these individually. I think the relative offset proposal is pretty straight-forward. It probably should have been done that way to begin with. I think you should get near-universal support on that one. Saving the re-compression on the server is a big win. I really wish we'd thought of that at the time. The timestamp problems we have are definitely annoying, and I agree that time is really a first class thign. But adding time has a ton of problems that need to be fully worked out before we pull the trigger. First, I like the implementation plan you have for the time index--I think you are saying that would retain the same format as the existing OffsetIndex, although it would require some refactoring. You are correct that this should be a separate index file--this will allow the index to be less frequent (smaller) and also let it page out if it isn't used. Now the bad bits about time! 1. Clock time isn't sequential. The whole point of NTP is to sync the clock. That means changing the time forwards and backward I think. Also users can change the time any time they want! Also when the master fails it moves to a different machine, maybe it's clock is sync'd, maybe it's not. If I mirror-maker two partitions into one then surely there is skew, possibly hours or days of skew (i.e. imagine cross-dc mirror maker where the network isn't available for a bit of time and then catches up). (Also not sure how having the leader do max(old_leader_time, current_time) works if we accept client times in the mm case?) 2. Nobody cares what time it is on the server. Consider cases where data is being copied from a database or from log files. In steady-state the server time is very close to the client time if their clocks are sync'd (see 1) but there will be times of large divergence when the copying process is stopped or falls behind. When this occurs it is clear that the time the data arrived on the server is irrelevant, it is the source timestamp that matters. This is the problem you are trying to fix by retaining the mm timestamp but really the client should always set the time with the use of server-side time as a fallback. It would be worth talking to the Samza folks and reading through this blog post ( http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html) on this subject since we went through similar learnings on the stream processing side. I think the implication of these two is that we need a proposal that handles potentially very out-of-order timestamps in some kind of sanish way (buggy clients will set something totally wrong as the time). -Jay On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io> wrote: > The magic byte is used to version message format so we'll need to make > sure that check is in place--I actually don't see it in the current > consumer code which I think is a bug we should fix for the next release > (filed KAFKA-2523). The purpose of that field is so there is a clear check > on the format rather than the scrambled scenarios Becket describes. > > Also, Becket, I don't think just fixing the java client is sufficient as > that would break other clients--i.e. if anyone writes a v1 messages, even > by accident, any non-v1-capable consumer will break. I think we probably > need a way to have the server ensure a particular message format either at > read or write time. > > -Jay > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > >> Hi Guozhang, >> >> I checked the code again. Actually CRC check probably won't fail. The >> newly >> added timestamp field might be treated as keyLength instead, so we are >> likely to receive an IllegalArgumentException when try to read the key. >> I'll update the KIP. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com> wrote: >> >> > Hi, Guozhang, >> > >> > Thanks for reading the KIP. By "old consumer", I meant the >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. >> If we >> > fix the ZookeeperConsumerConnector then it will throw exception >> complaining >> > about the unsupported version when it sees message format V1. What I was >> > trying to say is that if we have some ZookeeperConsumerConnector running >> > without the fix, the consumer will complain about CRC mismatch instead >> of >> > unsupported version. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> >> Thanks for the write-up Jiangjie. >> >> >> >> One comment about migration plan: "For old consumers, if they see the >> new >> >> protocol the CRC check will fail".. >> >> >> >> Do you mean this bug in the old consumer cannot be fixed in a >> >> backward-compatible way? >> >> >> >> Guozhang >> >> >> >> >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin <j...@linkedin.com.invalid >> > >> >> wrote: >> >> >> >> > Hi, >> >> > >> >> > We just created KIP-31 to propose a message format change in Kafka. >> >> > >> >> > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal >> >> > >> >> > As a summary, the motivations are: >> >> > 1. Avoid server side message re-compression >> >> > 2. Honor time-based log roll and retention >> >> > 3. Enable offset search by timestamp at a finer granularity. >> >> > >> >> > Feedback and comments are welcome! >> >> > >> >> > Thanks, >> >> > >> >> > Jiangjie (Becket) Qin >> >> > >> >> >> >> >> >> >> >> -- >> >> -- Guozhang >> >> >> > >> > >> > >