Hey Jay and Guozhang,

Thanks a lot for the reply. So if I understand correctly, Jay's proposal is:

1. Let client stamp the message create time.
2. Broker build index based on client-stamped message create time.
3. Broker only takes message whose create time is withing current time
plus/minus T (T is a configuration *max.append.delay*, could be topic level
configuration), if the timestamp is out of this range, broker rejects the
message.
4. Because the create time of messages can be out of order, when broker
builds the time based index it only provides the guarantee that if a
consumer starts consuming from the offset returned by searching by
timestamp t, they will not miss any message created after t, but might see
some messages created before t.

To build the time based index, every time when a broker needs to insert a
new time index entry, the entry would be {Largest_Timestamp_Ever_Seen ->
Current_Offset}. This basically means any timestamp larger than the
Largest_Timestamp_Ever_Seen must come after this offset because it never
saw them before. So we don't miss any message with larger timestamp.

(@Guozhang, in this case, for log retention we only need to take a look at
the last time index entry, because it must be the largest timestamp ever,
if that timestamp is overdue, we can safely delete any log segment before
that. So we don't need to scan the log segment file for log retention)

I assume that we are still going to have the new FetchRequest to allow the
time index replication for replicas.

I think Jay's main point here is that we don't want to have two timestamp
concepts in Kafka, which I agree is a reasonable concern. And I also agree
that create time is more meaningful than LogAppendTime for users. But I am
not sure if making everything base on Create Time would work in all cases.
Here are my questions about this approach:

1. Let's say we have two source clusters that are mirrored to the same
target cluster. For some reason one of the mirror maker from a cluster dies
and after fix the issue we want to resume mirroring. In this case it is
possible that when the mirror maker resumes mirroring, the timestamp of the
messages have already gone beyond the acceptable timestamp range on broker.
In order to let those messages go through, we have to bump up the
*max.append.delay
*for all the topics on the target broker. This could be painful.

2. Let's say in the above scenario we let the messages in, at that point
some log segments in the target cluster might have a wide range of
timestamps, like Guozhang mentioned the log rolling could be tricky because
the first time index entry does not necessarily have the smallest timestamp
of all the messages in the log segment. Instead, it is the largest
timestamp ever seen. We have to scan the entire log to find the message
with smallest offset to see if we should roll.

3. Theoretically it is possible that an older log segment contains
timestamps that are older than all the messages in a newer log segment. It
would be weird that we are supposed to delete the newer log segment before
we delete the older log segment.

4. In bootstrap case, if we reload the data to a Kafka cluster, we have to
make sure we configure the topic correctly before we load the data.
Otherwise the message might either be rejected because the timestamp is too
old, or it might be deleted immediately because the retention time has
reached.

I am very concerned about the operational overhead and the ambiguity of
guarantees we introduce if we purely rely on CreateTime.

It looks to me that the biggest issue of adopting CreateTime everywhere is
CreateTime can have big gaps. These gaps could be caused by several cases:
[1]. Faulty clients
[2]. Natural delays from different source
[3]. Bootstrap
[4]. Failure recovery

Jay's alternative proposal solves [1], perhaps solve [2] as well if we are
able to set a reasonable max.append.delay. But it does not seem address [3]
and [4]. I actually doubt if [3] and [4] are solvable because it looks the
CreateTime gap is unavoidable in those two cases.

Thanks,

Jiangjie (Becket) Qin


On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Just to complete Jay's option, here is my understanding:
>
> 1. For log retention: if we want to remove data before time t, we look into
> the index file of each segment and find the largest timestamp t' < t, find
> the corresponding timestamp and start scanning to the end of the segment,
> if there is no entry with timestamp >= t, we can delete this segment; if a
> segment's index smallest timestamp is larger than t, we can skip that
> segment.
>
> 2. For log rolling: if we want to start a new segment after time t, we look
> into the active segment's index file, if the largest timestamp is already >
> t, we can roll a new segment immediately; if it is < t, we read its
> corresponding offset and start scanning to the end of the segment, if we
> find a record whose timestamp > t, we can roll a new segment.
>
> For log rolling we only need to possibly scan a small portion the active
> segment, which should be fine; for log retention we may in the worst case
> end up scanning all segments, but in practice we may skip most of them
> since their smallest timestamp in the index file is larger than t.
>
> Guozhang
>
>
> On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > I think it should be possible to index out-of-order timestamps. The
> > timestamp index would be similar to the offset index, a memory mapped
> file
> > appended to as part of the log append, but would have the format
> >   timestamp offset
> > The timestamp entries would be monotonic and as with the offset index
> would
> > be no more often then every 4k (or some configurable threshold to keep
> the
> > index small--actually for timestamp it could probably be much more sparse
> > than 4k).
> >
> > A search for a timestamp t yields an offset o before which no prior
> message
> > has a timestamp >= t. In other words if you read the log starting with o
> > you are guaranteed not to miss any messages occurring at t or later
> though
> > you may get many before t (due to out-of-orderness). Unlike the offset
> > index this bound doesn't really have to be tight (i.e. probably no need
> to
> > go search the log itself, though you could).
> >
> > -Jay
> >
> > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Here's my basic take:
> > > - I agree it would be nice to have a notion of time baked in if it were
> > > done right
> > > - All the proposals so far seem pretty complex--I think they might make
> > > things worse rather than better overall
> > > - I think adding 2x8 byte timestamps to the message is probably a
> > > non-starter from a size perspective
> > > - Even if it isn't in the message, having two notions of time that
> > control
> > > different things is a bit confusing
> > > - The mechanics of basing retention etc on log append time when that's
> > not
> > > in the log seem complicated
> > >
> > > To that end here is a possible 4th option. Let me know what you think.
> > >
> > > The basic idea is that the message creation time is closest to what the
> > > user actually cares about but is dangerous if set wrong. So rather than
> > > substitute another notion of time, let's try to ensure the correctness
> of
> > > message creation time by preventing arbitrarily bad message creation
> > times.
> > >
> > > First, let's see if we can agree that log append time is not something
> > > anyone really cares about but rather an implementation detail. The
> > > timestamp that matters to the user is when the message occurred (the
> > > creation time). The log append time is basically just an approximation
> to
> > > this on the assumption that the message creation and the message
> receive
> > on
> > > the server occur pretty close together and the reason to prefer .
> > >
> > > But as these values diverge the issue starts to become apparent. Say
> you
> > > set the retention to one week and then mirror data from a topic
> > containing
> > > two years of retention. Your intention is clearly to keep the last
> week,
> > > but because the mirroring is appending right now you will keep two
> years.
> > >
> > > The reason we are liking log append time is because we are
> (justifiably)
> > > concerned that in certain situations the creation time may not be
> > > trustworthy. This same problem exists on the servers but there are
> fewer
> > > servers and they just run the kafka code so it is less of an issue.
> > >
> > > There are two possible ways to handle this:
> > >
> > >    1. Just tell people to add size based retention. I think this is not
> > >    entirely unreasonable, we're basically saying we retain data based
> on
> > the
> > >    timestamp you give us in the data. If you give us bad data we will
> > retain
> > >    it for a bad amount of time. If you want to ensure we don't retain
> > "too
> > >    much" data, define "too much" by setting a time-based retention
> > setting.
> > >    This is not entirely unreasonable but kind of suffers from a "one
> bad
> > >    apple" problem in a very large environment.
> > >    2. Prevent bad timestamps. In general we can't say a timestamp is
> bad.
> > >    However the definition we're implicitly using is that we think there
> > are a
> > >    set of topics/clusters where the creation timestamp should always be
> > "very
> > >    close" to the log append timestamp. This is true for data sources
> > that have
> > >    no buffering capability (which at LinkedIn is very common, but is
> > more rare
> > >    elsewhere). The solution in this case would be to allow a setting
> > along the
> > >    lines of max.append.delay which checks the creation timestamp
> against
> > the
> > >    server time to look for too large a divergence. The solution would
> > either
> > >    be to reject the message or to override it with the server time.
> > >
> > > So in LI's environment you would configure the clusters used for
> direct,
> > > unbuffered, message production (e.g. tracking and metrics local) to
> > enforce
> > > a reasonably aggressive timestamp bound (say 10 mins), and all other
> > > clusters would just inherent these.
> > >
> > > The downside of this approach is requiring the special configuration.
> > > However I think in 99% of environments this could be skipped entirely,
> > it's
> > > only when the ratio of clients to servers gets so massive that you need
> > to
> > > do this. The primary upside is that you have a single authoritative
> > notion
> > > of time which is closest to what a user would want and is stored
> directly
> > > in the message.
> > >
> > > I'm also assuming there is a workable approach for indexing
> non-monotonic
> > > timestamps, though I haven't actually worked that out.
> > >
> > > -Jay
> > >
> > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> > > wrote:
> > >
> > >> Bumping up this thread although most of the discussion were on the
> > >> discussion thread of KIP-31 :)
> > >>
> > >> I just updated the KIP page to add detailed solution for the option
> > >> (Option
> > >> 3) that does not expose the LogAppendTime to user.
> > >>
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
> > >>
> > >> The option has a minor change to the fetch request to allow fetching
> > time
> > >> index entry as well. I kind of like this solution because its just
> doing
> > >> what we need without introducing other things.
> > >>
> > >> It will be great to see what are the feedback. I can explain more
> during
> > >> tomorrow's KIP hangout.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com>
> > wrote:
> > >>
> > >> > Hi Jay,
> > >> >
> > >> > I just copy/pastes here your feedback on the timestamp proposal that
> > was
> > >> > in the discussion thread of KIP-31. Please see the replies inline.
> > >> > The main change I made compared with previous proposal is to add
> both
> > >> > CreateTime and LogAppendTime to the message.
> > >> >
> > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io>
> wrote:
> > >> >
> > >> > > Hey Beckett,
> > >> > >
> > >> > > I was proposing splitting up the KIP just for simplicity of
> > >> discussion.
> > >> > You
> > >> > > can still implement them in one patch. I think otherwise it will
> be
> > >> hard
> > >> > to
> > >> > > discuss/vote on them since if you like the offset proposal but not
> > the
> > >> > time
> > >> > > proposal what do you do?
> > >> > >
> > >> > > Introducing a second notion of time into Kafka is a pretty massive
> > >> > > philosophical change so it kind of warrants it's own KIP I think
> it
> > >> > isn't
> > >> > > just "Change message format".
> > >> > >
> > >> > > WRT time I think one thing to clarify in the proposal is how MM
> will
> > >> have
> > >> > > access to set the timestamp? Presumably this will be a new field
> in
> > >> > > ProducerRecord, right? If so then any user can set the timestamp,
> > >> right?
> > >> > > I'm not sure you answered the questions around how this will work
> > for
> > >> MM
> > >> > > since when MM retains timestamps from multiple partitions they
> will
> > >> then
> > >> > be
> > >> > > out of order and in the past (so the max(lastAppendedTimestamp,
> > >> > > currentTimeMillis) override you proposed will not work, right?).
> If
> > we
> > >> > > don't do this then when you set up mirroring the data will all be
> > new
> > >> and
> > >> > > you have the same retention problem you described. Maybe I missed
> > >> > > something...?
> > >> > lastAppendedTimestamp means the timestamp of the message that last
> > >> > appended to the log.
> > >> > If a broker is a leader, since it will assign the timestamp by
> itself,
> > >> the
> > >> > lastAppenedTimestamp will be its local clock when append the last
> > >> message.
> > >> > So if there is no leader migration, max(lastAppendedTimestamp,
> > >> > currentTimeMillis) = currentTimeMillis.
> > >> > If a broker is a follower, because it will keep the leader's
> timestamp
> > >> > unchanged, the lastAppendedTime would be the leader's clock when it
> > >> appends
> > >> > that message message. It keeps track of the lastAppendedTime only in
> > >> case
> > >> > it becomes leader later on. At that point, it is possible that the
> > >> > timestamp of the last appended message was stamped by old leader,
> but
> > >> the
> > >> > new leader's currentTimeMillis < lastAppendedTime. If a new message
> > >> comes,
> > >> > instead of stamp it with new leader's currentTimeMillis, we have to
> > >> stamp
> > >> > it to lastAppendedTime to avoid the timestamp in the log going
> > backward.
> > >> > The max(lastAppendedTimestamp, currentTimeMillis) is purely based on
> > the
> > >> > broker side clock. If MM produces message with different
> LogAppendTime
> > >> in
> > >> > source clusters to the same target cluster, the LogAppendTime will
> be
> > >> > ignored  re-stamped by target cluster.
> > >> > I added a use case example for mirror maker in KIP-32. Also there
> is a
> > >> > corner case discussion about when we need the max(lastAppendedTime,
> > >> > currentTimeMillis) trick. Could you take a look and see if that
> > answers
> > >> > your question?
> > >> >
> > >> > >
> > >> > > My main motivation is that given that both Samza and Kafka streams
> > are
> > >> > > doing work that implies a mandatory client-defined notion of
> time, I
> > >> > really
> > >> > > think introducing a different mandatory notion of time in Kafka is
> > >> going
> > >> > to
> > >> > > be quite odd. We should think hard about how client-defined time
> > could
> > >> > > work. I'm not sure if it can, but I'm also not sure that it can't.
> > >> Having
> > >> > > both will be odd. Did you chat about this with Yi/Kartik on the
> > Samza
> > >> > side?
> > >> > I talked with Kartik and realized that it would be useful to have a
> > >> client
> > >> > timestamp to facilitate use cases like stream processing.
> > >> > I was trying to figure out if we can simply use client timestamp
> > without
> > >> > introducing the server time. There are some discussion in the KIP.
> > >> > The key problem we want to solve here is
> > >> > 1. We want log retention and rolling to depend on server clock.
> > >> > 2. We want to make sure the log-assiciated timestamp to be retained
> > when
> > >> > replicas moves.
> > >> > 3. We want to use the timestamp in some way that can allow searching
> > by
> > >> > timestamp.
> > >> > For 1 and 2, an alternative is to pass the log-associated timestamp
> > >> > through replication, that means we need to have a different protocol
> > for
> > >> > replica fetching to pass log-associated timestamp. It is actually
> > >> > complicated and there could be a lot of corner cases to handle. e.g.
> > >> what
> > >> > if an old leader started to fetch from the new leader, should it
> also
> > >> > update all of its old log segment timestamp?
> > >> > I think actually client side timestamp would be better for 3 if we
> can
> > >> > find a way to make it work.
> > >> > So far I am not able to convince myself that only having client side
> > >> > timestamp would work mainly because 1 and 2. There are a few
> > situations
> > >> I
> > >> > mentioned in the KIP.
> > >> > >
> > >> > > When you are saying it won't work you are assuming some particular
> > >> > > implementation? Maybe that the index is a monotonically increasing
> > >> set of
> > >> > > pointers to the least record with a timestamp larger than the
> index
> > >> time?
> > >> > > In other words a search for time X gives the largest offset at
> which
> > >> all
> > >> > > records are <= X?
> > >> > It is a promising idea. We probably can have an in-memory index like
> > >> that,
> > >> > but might be complicated to have a file on disk like that. Imagine
> > there
> > >> > are two timestamps T0 < T1. We see message Y created at T1 and
> created
> > >> > index like [T1->Y], then we see message created at T1, supposedly we
> > >> should
> > >> > have index look like [T0->X, T1->Y], it is easy to do in memory, but
> > we
> > >> > might have to rewrite the index file completely. Maybe we can have
> the
> > >> > first entry with timestamp to 0, and only update the first pointer
> for
> > >> any
> > >> > out of range timestamp, so the index will be [0->X, T1->Y]. Also,
> the
> > >> range
> > >> > of timestamps in the log segments can overlap with each other. That
> > >> means
> > >> > we either need to keep a cross segments index file or we need to
> check
> > >> all
> > >> > the index file for each log segment.
> > >> > I separated out the time based log index to KIP-33 because it can be
> > an
> > >> > independent follow up feature as Neha suggested. I will try to make
> > the
> > >> > time based index work with client side timestamp.
> > >> > >
> > >> > > For retention, I agree with the problem you point out, but I think
> > >> what
> > >> > you
> > >> > > are saying in that case is that you want a size limit too. If you
> > use
> > >> > > system time you actually hit the same problem: say you do a full
> > dump
> > >> of
> > >> > a
> > >> > > DB table with a setting of 7 days retention, your retention will
> > >> actually
> > >> > > not get enforced for the first 7 days because the data is "new to
> > >> Kafka".
> > >> > I kind of think the size limit here is orthogonal. It is a valid use
> > >> case
> > >> > where people only want to use time based retention only. In your
> > >> example,
> > >> > depending on client timestamp might break the functionality - say it
> > is
> > >> a
> > >> > bootstrap case people actually need to read all the data. If we
> depend
> > >> on
> > >> > the client timestamp, the data might be deleted instantly when they
> > >> come to
> > >> > the broker. It might be too demanding to expect the broker to
> > understand
> > >> > what people actually want to do with the data coming in. So the
> > >> guarantee
> > >> > of using server side timestamp is that "after appended to the log,
> all
> > >> > messages will be available on broker for retention time", which is
> not
> > >> > changeable by clients.
> > >> > >
> > >> > > -Jay
> > >> >
> > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com>
> > >> wrote:
> > >> >
> > >> >> Hi folks,
> > >> >>
> > >> >> This proposal was previously in KIP-31 and we separated it to
> KIP-32
> > >> per
> > >> >> Neha and Jay's suggestion.
> > >> >>
> > >> >> The proposal is to add the following two timestamps to Kafka
> message.
> > >> >> - CreateTime
> > >> >> - LogAppendTime
> > >> >>
> > >> >> The CreateTime will be set by the producer and will change after
> > that.
> > >> >> The LogAppendTime will be set by broker for purpose such as enforce
> > log
> > >> >> retention and log rolling.
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> Jiangjie (Becket) Qin
> > >> >>
> > >> >>
> > >> >
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to