Hey Ewen,

Thanks for the comments and they are really good questions. Please inline
replies.

On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> A few questions:
>
> 1. If we update the producers to only support V1, doesn't that mean people
> get stuck on the current version of the producer until they can be sure all
> their consumers have been upgraded? Is that going to be a problem for
> anyone, and does it potentially keep important fixes/enhancements (e.g. the
> upcoming client-side network request timeouts) because they have to wait
> for all the consumers to upgrade first?
>
This is a good point. I thought about this before, I and my initial
thinking is that we might need to add a config on producer to specify which
version you want to use to produce. But this seems to be a pretty ad-hoc
approach and I don't really like it. We are working on some general
protocol version control mechanism proposal and will have a separate KIP
for that.

>
> 2. Why minute granularity specifically? That's seems fairly specific to
> LI's example workload (which, by the way, is a very useful reference point
> to include in the KIP, thanks for that). If someone has the memory and can
> support it, why not let them go all the way to per-record timestamps (by
> making the interval configurable and they can set it really small/to 0)? It
> seems to me that some people might be willing to pay that cost for the
> application-side simplicity if they want to consume from specific
> timestamps. With the proposal as is, seeking to a timestamp still requires
> client-side logic to filter out messages that might have earlier timestamps
> due to the granularity chosen. The obvious tradeoff here is yet another
> config option -- I'm loath to add yet more configs, although this is one
> that we can choose a reasonable default for (like 1 minute) and people can
> easily change with no impact if they need to adjust it.
>
The searching granularity will be actually millisecond. The index
granularity only determines how close you will be to the actually message
with the timestamp you are looking for. For example, if you are looking for
a message with timestamp 10:00:15, a minute granularity will give you the
offset at 10:00:00, and it needs to go through the records from 10:00:00 to
10:00:15 to find the message. But with a second level granularity, it might
only need to go through the message produced in one second. So minute level
granularity index will take longer for search, but the precision will be
the same as second level index. That said, I am not objecting to adding the
granularity configuration but I am not sure how useful it would be to have
second level index because I think typically a consumer will be
long-running and only search for the timestamp at startup.
I will update the KIP page to clarify the precision.

>

3. Why not respect the timestamp passed in as long as it's not some
> sentinel value that indicates the broker should fill it in? When you do
> know they will be ordered, as they should be with mirror maker (which is
> specifically mentioned), this seems really useful. (More about this in
> questions below...)
>
Like what you mentioned in (4), having a log without monotonically
increasing timestamp is weird. To me it is even worse than having an empty
timestamp field in the inner message that will not be used except for log
compacted topic. I think the only way to solve this issue is to add another
CreateTime to the message. So far I am not sure how useful it is though
because arguably people can always put this timestamp in side the payload.
So I think this timestamp is more for server side usage instead of
application / client side usage.

>
> 4. I think one obvious answer to (3) is that accepting client's timestamps
> could break seeking-by-timestamp since they may not be properly ordered.
> However, I think this can break anyway under normal operations -- any
> imperfections in clock synchronization could result in older timestamps
> being applied to new messages on a new leader broker compared messages
> stored previously by the old leader. I think it's probably too common to
> just think that NTP will take care of it and then run into weird bugs
> because NTP isn't always perfect, sometimes does some unexpected things,
> and, of course, does what you tell it to and so is subject to user error.

This is a good point. Yes, NTP only guarantee limited synchronization
precision (several microseconds if you have low stratum and appropriate
PPS). My experience is that it actually is good and stable enough even for
some mission critical system such a core banking. Maybe this is more of an
implementation detail. The simple solution is that when leader append
messages to the log, it always take the max(lastAppendedTimestamp,
currentTimeMillis). Arguably we can play the same trick even if we let the
producer to fill in the timestamp. But that means the timestamp producer
set may ore may not be honored, which is a bit ugly. Also, if some producer
produced a super large timestamp by mistake, it will ruin timestamps of all
the messages.

>
> It would be reasonable to argue that the leader change issue is much less
> likely of an issue than if you respect timestamps from producers, where, if
> applications actually filled it in, you'd receive a jumbled mess of
> timestamps and trying to do the binary search in the index wouldn't
> necessarily give you correct results. However, a) we could allow clients to
> fill that info in but discourage it where it might cause issues (i.e.
> normal applications) and it seems like a significant win for mirrormaker.


> I actually think not accepting timestamps is probably the better choice but
> a) it seems awkward in the protocol spec because we have to include the
> field in the produce requests since we don't want to have to fully decode
> them on the broker and b) losing that info during mirroring seems like it
> breaks the goal of fixing log retention (at least for mirrors) as well as
> the goal of improving searching by timestamp (at least for mirrors).
>
In terms of log retention, people might have different requirement for
cross cluster log retention. It looks simpler to me that we simply honor
the retention for each cluster instead of the entire pipeline. Can you
provide some use cases that cross-cluster retention is critical?

>
> 5. You never actually specified the granularity (or format, but I assume
> unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This
> definitely needs to eventually make it into the protocol docs.
>
It should be millisecond. I'll add it to the KIP.

>
> 6. Re: the rejected alternative. Are there any other options in changing
> the config format that might make it a bit lighter weight? For example, do
> we need a full int64? Could we do something relative instead that wouldn't
> require as many bytes? Without (5) being specified, it's actually difficult
> to evaluate some of these options.
>
Even if we use int32, it will still be a 50% memory footprint increase. I
just feel not worth spending so much memory on a very infrequent operation
especially when Kafka largely depends on the OS caching.

>
> 7. Any plan to expose this in the client APIs? This is related to (4). If
> they are not exposed anywhere in the API as being associated with messages,
> then we can reasonably treat them as an internal implementation detail and
> be very clear about what looking up an offset for a timestamp means. If
> they are exposed, then they're more like message metadata that applications
> are probably going to want preserved, and thus will want the broker to
> respect the timestamp. For example, if I saw that consumers could get the
> timestamp, I'd want to be able to assign it at the producer so that even if
> I have significant batching I still get an accurate timestamp -- basically
> I might replace some cases where I use a timestamp embedded in the message
> with the timestamp provided by Kafka. (That said, we should consider the
> impact that including it in the protocol can have; non-Java clients are
> likely to expose it if it is available, whether it's actually a good idea
> to or not.)
>
Yes, searching by timestamp will be a Client API. Actually it is currently
a client API, OffsetRequest will search the offset by timestamp. While the
application timestamp is always important, the application level timestamp
can always be put into a payload. I think the timestamp in this case is
pretty much the same as offset, i.e. assigned by server but will be
provided to consumer for reference.

>
> -Ewen
>
>
> On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Hi Guozhang,
> >
> > I checked the code again. Actually CRC check probably won't fail. The
> newly
> > added timestamp field might be treated as keyLength instead, so we are
> > likely to receive an IllegalArgumentException when try to read the key.
> > I'll update the KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com> wrote:
> >
> > > Hi, Guozhang,
> > >
> > > Thanks for reading the KIP. By "old consumer", I meant the
> > > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed.
> If
> > we
> > > fix the ZookeeperConsumerConnector then it will throw exception
> > complaining
> > > about the unsupported version when it sees message format V1. What I
> was
> > > trying to say is that if we have some ZookeeperConsumerConnector
> running
> > > without the fix, the consumer will complain about CRC mismatch instead
> of
> > > unsupported version.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Thanks for the write-up Jiangjie.
> > >>
> > >> One comment about migration plan: "For old consumers, if they see the
> > new
> > >> protocol the CRC check will fail"..
> > >>
> > >> Do you mean this bug in the old consumer cannot be fixed in a
> > >> backward-compatible way?
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > We just created KIP-31 to propose a message format change in Kafka.
> > >> >
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> > >> >
> > >> > As a summary, the motivations are:
> > >> > 1. Avoid server side message re-compression
> > >> > 2. Honor time-based log roll and retention
> > >> > 3. Enable offset search by timestamp at a finer granularity.
> > >> >
> > >> > Feedback and comments are welcome!
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jiangjie (Becket) Qin
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to