Jay had mentioned the scenario of mirror-maker bootstrap which would
effectively reset the logAppendTimestamps for the bootstrapped data.
If we don't include logAppendTimestamps in each message there is a
similar scenario when rebuilding indexes during recovery. So it seems
it may be worth adding that timestamp to messages. The drawback to
that is exposing a server-side concept in the protocol (although we
already do that with offsets). logAppendTimestamp really should be
decided by the broker so I think the first scenario may have to be
written off as a gotcha, but the second may be worth addressing (by
adding it to the message format).

The other point that Jay raised which needs to be addressed (since we
require monotically increasing timestamps in the index) in the
proposal is changing time on the server (I'm a little less concerned
about NTP clock skews than a user explicitly changing the server's
time - i.e., big clock skews). We would at least want to "set back"
all the existing timestamps to guarantee non-decreasing timestamps
with future messages. I'm not sure at this point how best to handle
that, but we could perhaps have a epoch/base-time (or time-correction)
stored in the log directories and base all log index timestamps off
that base-time (or corrected). So if at any time you determine that
time has changed backwards you can adjust that base-time without
having to fix up all the entries. Without knowing the exact diff
between the previous clock and new clock we cannot adjust the times
exactly, but we can at least ensure increasing timestamps.

On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin
<j...@linkedin.com.invalid> wrote:
> Ewen and Jay,
>
> They way I see the LogAppendTime is another format of "offset". It serves
> the following purpose:
> 1. Locate messages not only by position, but also by time. The difference
> from offset is timestamp is not unique for all messags.
> 2. Allow broker to manage messages based on time, e.g. retention, rolling
> 3. Provide convenience for user to search message not only by offset, but
> also by timestamp.
>
> For purpose (2) we don't need per message server timestamp. We only need
> per log segment server timestamp and propagate it among brokers.
>
> For (1) and (3), we need per message timestamp. Then the question is
> whether we should use CreateTime or LogAppendTime?
>
> I completely agree that an application timestamp is very useful for many
> use cases. But it seems to me that having Kafka to understand and maintain
> application timestamp is a bit over demanding. So I think there is value to
> pass on CreateTime for application convenience, but I am not sure it can
> replace LogAppendTime. Managing out-of-order CreateTime is equivalent to
> allowing producer to send their own offset and ask broker to manage the
> offset for them, It is going to be very hard to maintain and could create
> huge performance/functional issue because of complicated logic.
>
> About whether we should expose LogAppendTime to broker, I agree that server
> timestamp is internal to broker, but isn't offset also an internal concept?
> Arguably it's not provided by producer so consumer application logic does
> not have to know offset. But user needs to know offset because they need to
> know "where is the message" in the log. LogAppendTime provides the answer
> of "When was the message appended" to the log. So personally I think it is
> reasonable to expose the LogAppendTime to consumers.
>
> I can see some use cases of exposing the LogAppendTime, to name some:
> 1. Let's say broker has 7 days of log retention, some application wants to
> reprocess the data in past 3 days. User can simply provide the timestamp
> and start consume.
> 2. User can easily know lag by time.
> 3. Cross cluster fail over. This is a more complicated use case, there are
> two goals: 1) Not lose message; and 2) do not reconsume tons of messages.
> Only knowing offset of cluster A won't help with finding fail over point in
> cluster B  because an offset of a cluster means nothing to another cluster.
> Timestamp however is a good cross cluster reference in this case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
>> Re: MM preserving timestamps: Yes, this was how I interpreted the point in
>> the KIP and I only raised the issue because it restricts the usefulness of
>> timestamps anytime MM is involved. I agree it's not a deal breaker, but I
>> wanted to understand exact impact of the change. Some users seem to want to
>> be able to seek by application-defined timestamps (despite the many obvious
>> issues involved), and the proposal clearly would not support that unless
>> the timestamps submitted with the produce requests were respected. If we
>> ignore client submitted timestamps, then we probably want to try to hide
>> the timestamps as much as possible in any public interface (e.g. never
>> shows up in any public consumer APIs), but expose it just enough to be
>> useful for operational purposes.
>>
>> Sorry if my devil's advocate position / attempt to map the design space led
>> to some confusion!
>>
>> -Ewen
>>
>>
>> On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps <j...@confluent.io> wrote:
>>
>> > Ah, I see, I think I misunderstood about MM, it was called out in the
>> > proposal and I thought you were saying you'd retain the timestamp but I
>> > think you're calling out that you're not. In that case you do have the
>> > opposite problem, right? When you add mirroring for a topic all that data
>> > will have a timestamp of now and retention won't be right. Not a blocker
>> > but a bit of a gotcha.
>> >
>> > -Jay
>> >
>> >
>> >
>> > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
>> >
>> > > > Don't you see all the same issues you see with client-defined
>> > timestamp's
>> > > > if you let mm control the timestamp as you were proposing? That means
>> > > time
>> > >
>> > > Actually I don't think that was in the proposal (or was it?). i.e., I
>> > > think it was always supposed to be controlled by the broker (and not
>> > > MM).
>> > >
>> > > > Also, Joel, can you just confirm that you guys have talked through
>> the
>> > > > whole timestamp thing with the Samza folks at LI? The reason I ask
>> > about
>> > > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely
>> > on
>> > >
>> > > We have not. This is a good point - we will follow-up.
>> > >
>> > > > WRT your idea of a FollowerFetchRequestI had thought of a similar
>> idea
>> > > > where we use the leader's timestamps to approximately set the
>> > follower's
>> > > > timestamps. I had thought of just adding a partition metadata request
>> > > that
>> > > > would subsume the current offset/time lookup and could be used by the
>> > > > follower to try to approximately keep their timestamps kosher. It's a
>> > > > little hacky and doesn't help with MM but it is also maybe less
>> > invasive
>> > > so
>> > > > that approach could be viable.
>> > >
>> > > That would also work, but perhaps responding with the actual leader
>> > > offset-timestamp entries (corresponding to the fetched portion) would
>> > > be exact and it should be small as well. Anyway, the main motivation
>> > > in this was to avoid leaking server-side timestamps to the
>> > > message-format if people think it is worth it so the alternatives are
>> > > implementation details. My original instinct was that it also avoids a
>> > > backwards incompatible change (but it does not because we also have
>> > > the relative offset change).
>> > >
>> > > Thanks,
>> > >
>> > > Joel
>> > >
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy <jjkosh...@gmail.com>
>> > wrote:
>> > > >
>> > > >> I just wanted to comment on a few points made earlier in this
>> thread:
>> > > >>
>> > > >> Concerns on clock skew: at least for the original proposal's scope
>> > > >> (which was more for honoring retention broker-side) this would only
>> be
>> > > >> an issue when spanning leader movements right? i.e., leader
>> migration
>> > > >> latency has to be much less than clock skew for this to be a real
>> > > >> issue wouldn’t it?
>> > > >>
>> > > >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers)
>> are
>> > > >> the right place to reason about client-side timestamps precisely due
>> > > >> to the nuances that have been discussed at length in this thread. My
>> > > >> preference would have been to the timestamp (now called
>> > > >> LogAppendTimestamp) have nothing to do with the applications. Ewen
>> > > >> raised a valid concern about leaking such “private/server-side”
>> > > >> timestamps into the protocol spec. i.e., it is fine to have the
>> > > >> CreateTime which is expressly client-provided and immutable
>> > > >> thereafter, but the LogAppendTime is also going part of the protocol
>> > > >> and it would be good to avoid exposure (to client developers) if
>> > > >> possible. Ok, so here is a slightly different approach that I was
>> just
>> > > >> thinking about (and did not think too far so it may not work): do
>> not
>> > > >> add the LogAppendTime to messages. Instead, build the time-based
>> index
>> > > >> on the server side on message arrival time alone. Introduce a new
>> > > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
>> > > >> include the slice of the time-based index for the follower broker.
>> > > >> This way we can at least keep timestamps aligned across brokers for
>> > > >> retention purposes. We do lose the append timestamp for mirroring
>> > > >> pipelines (which appears to be the case in KIP-32 as well).
>> > > >>
>> > > >> Configurable index granularity: We can do this but I’m not sure it
>> is
>> > > >> very useful and as Jay noted, a major change from the old proposal
>> > > >> linked from the KIP is the sparse time-based index which we felt was
>> > > >> essential to bound memory usage (and having timestamps on each log
>> > > >> index entry was probably a big waste since in the common case
>> several
>> > > >> messages span the same timestamp). BTW another benefit of the second
>> > > >> index is that it makes it easier to roll-back or throw away if
>> > > >> necessary (vs. modifying the existing index format) - although that
>> > > >> obviously does not help with rolling back the timestamp change in
>> the
>> > > >> message format, but it is one less thing to worry about.
>> > > >>
>> > > >> Versioning: I’m not sure everyone is saying the same thing wrt the
>> > > >> scope of this. There is the record format change, but I also think
>> > > >> this ties into all of the API versioning that we already have in
>> > > >> Kafka. The current API versioning approach works fine for
>> > > >> upgrades/downgrades across official Kafka releases, but not so well
>> > > >> between releases. (We almost got bitten by this at LinkedIn with the
>> > > >> recent changes to various requests but were able to work around
>> > > >> these.) We can clarify this in the follow-up KIP.
>> > > >>
>> > > >> Thanks,
>> > > >>
>> > > >> Joel
>> > > >>
>> > > >>
>> > > >> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin
>> > <j...@linkedin.com.invalid
>> > > >
>> > > >> wrote:
>> > > >> > Hi Jay,
>> > > >> >
>> > > >> > I just changed the KIP title and updated the KIP page.
>> > > >> >
>> > > >> > And yes, we are working on a general version control proposal to
>> > make
>> > > the
>> > > >> > protocol migration like this more smooth. I will also create a KIP
>> > for
>> > > >> that
>> > > >> > soon.
>> > > >> >
>> > > >> > Thanks,
>> > > >> >
>> > > >> > Jiangjie (Becket) Qin
>> > > >> >
>> > > >> >
>> > > >> > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <j...@confluent.io>
>> > wrote:
>> > > >> >
>> > > >> >> Great, can we change the name to something related to the
>> > > >> change--"KIP-31:
>> > > >> >> Move to relative offsets in compressed message sets".
>> > > >> >>
>> > > >> >> Also you had mentioned before you were going to expand on the
>> > > mechanics
>> > > >> of
>> > > >> >> handling these log format changes, right?
>> > > >> >>
>> > > >> >> -Jay
>> > > >> >>
>> > > >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin
>> > > >> <j...@linkedin.com.invalid>
>> > > >> >> wrote:
>> > > >> >>
>> > > >> >> > Neha and Jay,
>> > > >> >> >
>> > > >> >> > Thanks a lot for the feedback. Good point about splitting the
>> > > >> >> discussion. I
>> > > >> >> > have split the proposal to three KIPs and it does make each
>> > > discussion
>> > > >> >> more
>> > > >> >> > clear:
>> > > >> >> > KIP-31 - Message format change (Use relative offset)
>> > > >> >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message
>> > > >> >> > KIP-33 - Build a time-based log index
>> > > >> >> >
>> > > >> >> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss
>> about
>> > > >> KIP-31
>> > > >> >> > and KIP-32 first for now. I will create a separate discussion
>> > > thread
>> > > >> for
>> > > >> >> > KIP-32 and reply the concerns you raised regarding the
>> timestamp.
>> > > >> >> >
>> > > >> >> > So far it looks there is no objection to KIP-31. Since I
>> removed
>> > a
>> > > few
>> > > >> >> part
>> > > >> >> > from previous KIP and only left the relative offset proposal,
>> it
>> > > >> would be
>> > > >> >> > great if people can take another look to see if there is any
>> > > concerns.
>> > > >> >> >
>> > > >> >> > Thanks,
>> > > >> >> >
>> > > >> >> > Jiangjie (Becket) Qin
>> > > >> >> >
>> > > >> >> >
>> > > >> >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede <
>> n...@confluent.io
>> > >
>> > > >> wrote:
>> > > >> >> >
>> > > >> >> > > Becket,
>> > > >> >> > >
>> > > >> >> > > Nice write-up. Few thoughts -
>> > > >> >> > >
>> > > >> >> > > I'd split up the discussion for simplicity. Note that you can
>> > > always
>> > > >> >> > group
>> > > >> >> > > several of these in one patch to reduce the protocol changes
>> > > people
>> > > >> >> have
>> > > >> >> > to
>> > > >> >> > > deal with.This is just a suggestion, but I think the
>> following
>> > > split
>> > > >> >> > might
>> > > >> >> > > make it easier to tackle the changes being proposed -
>> > > >> >> > >
>> > > >> >> > >    - Relative offsets
>> > > >> >> > >    - Introducing the concept of time
>> > > >> >> > >    - Time-based indexing (separate the usage of the timestamp
>> > > field
>> > > >> >> from
>> > > >> >> > >    how/whether we want to include a timestamp in the message)
>> > > >> >> > >
>> > > >> >> > > I'm a +1 on relative offsets, we should've done it back when
>> we
>> > > >> >> > introduced
>> > > >> >> > > it. Other than reducing the CPU overhead, this will also
>> reduce
>> > > the
>> > > >> >> > garbage
>> > > >> >> > > collection overhead on the brokers.
>> > > >> >> > >
>> > > >> >> > > On the timestamp field, I generally agree that we should add
>> a
>> > > >> >> timestamp
>> > > >> >> > to
>> > > >> >> > > a Kafka message but I'm not quite sold on how this KIP
>> suggests
>> > > the
>> > > >> >> > > timestamp be set. Will avoid repeating the downsides of a
>> > broker
>> > > >> side
>> > > >> >> > > timestamp mentioned previously in this thread. I think the
>> > topic
>> > > of
>> > > >> >> > > including a timestamp in a Kafka message requires a lot more
>> > > thought
>> > > >> >> and
>> > > >> >> > > details than what's in this KIP. I'd suggest we make it a
>> > > separate
>> > > >> KIP
>> > > >> >> > that
>> > > >> >> > > includes a list of all the different use cases for the
>> > timestamp
>> > > >> >> (beyond
>> > > >> >> > > log retention) including stream processing and discuss
>> > tradeoffs
>> > > of
>> > > >> >> > > including client and broker side timestamps.
>> > > >> >> > >
>> > > >> >> > > Agree with the benefit of time-based indexing, but haven't
>> had
>> > a
>> > > >> chance
>> > > >> >> > to
>> > > >> >> > > dive into the design details yet.
>> > > >> >> > >
>> > > >> >> > > Thanks,
>> > > >> >> > > Neha
>> > > >> >> > >
>> > > >> >> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io
>> >
>> > > >> wrote:
>> > > >> >> > >
>> > > >> >> > > > Hey Beckett,
>> > > >> >> > > >
>> > > >> >> > > > I was proposing splitting up the KIP just for simplicity of
>> > > >> >> discussion.
>> > > >> >> > > You
>> > > >> >> > > > can still implement them in one patch. I think otherwise it
>> > > will
>> > > >> be
>> > > >> >> > hard
>> > > >> >> > > to
>> > > >> >> > > > discuss/vote on them since if you like the offset proposal
>> > but
>> > > not
>> > > >> >> the
>> > > >> >> > > time
>> > > >> >> > > > proposal what do you do?
>> > > >> >> > > >
>> > > >> >> > > > Introducing a second notion of time into Kafka is a pretty
>> > > massive
>> > > >> >> > > > philosophical change so it kind of warrants it's own KIP I
>> > > think
>> > > >> it
>> > > >> >> > isn't
>> > > >> >> > > > just "Change message format".
>> > > >> >> > > >
>> > > >> >> > > > WRT time I think one thing to clarify in the proposal is
>> how
>> > MM
>> > > >> will
>> > > >> >> > have
>> > > >> >> > > > access to set the timestamp? Presumably this will be a new
>> > > field
>> > > >> in
>> > > >> >> > > > ProducerRecord, right? If so then any user can set the
>> > > timestamp,
>> > > >> >> > right?
>> > > >> >> > > > I'm not sure you answered the questions around how this
>> will
>> > > work
>> > > >> for
>> > > >> >> > MM
>> > > >> >> > > > since when MM retains timestamps from multiple partitions
>> > they
>> > > >> will
>> > > >> >> > then
>> > > >> >> > > be
>> > > >> >> > > > out of order and in the past (so the
>> > max(lastAppendedTimestamp,
>> > > >> >> > > > currentTimeMillis) override you proposed will not work,
>> > > right?).
>> > > >> If
>> > > >> >> we
>> > > >> >> > > > don't do this then when you set up mirroring the data will
>> > all
>> > > be
>> > > >> new
>> > > >> >> > and
>> > > >> >> > > > you have the same retention problem you described. Maybe I
>> > > missed
>> > > >> >> > > > something...?
>> > > >> >> > > >
>> > > >> >> > > > My main motivation is that given that both Samza and Kafka
>> > > streams
>> > > >> >> are
>> > > >> >> > > > doing work that implies a mandatory client-defined notion
>> of
>> > > >> time, I
>> > > >> >> > > really
>> > > >> >> > > > think introducing a different mandatory notion of time in
>> > > Kafka is
>> > > >> >> > going
>> > > >> >> > > to
>> > > >> >> > > > be quite odd. We should think hard about how client-defined
>> > > time
>> > > >> >> could
>> > > >> >> > > > work. I'm not sure if it can, but I'm also not sure that it
>> > > can't.
>> > > >> >> > Having
>> > > >> >> > > > both will be odd. Did you chat about this with Yi/Kartik on
>> > the
>> > > >> Samza
>> > > >> >> > > side?
>> > > >> >> > > >
>> > > >> >> > > > When you are saying it won't work you are assuming some
>> > > particular
>> > > >> >> > > > implementation? Maybe that the index is a monotonically
>> > > increasing
>> > > >> >> set
>> > > >> >> > of
>> > > >> >> > > > pointers to the least record with a timestamp larger than
>> the
>> > > >> index
>> > > >> >> > time?
>> > > >> >> > > > In other words a search for time X gives the largest offset
>> > at
>> > > >> which
>> > > >> >> > all
>> > > >> >> > > > records are <= X?
>> > > >> >> > > >
>> > > >> >> > > > For retention, I agree with the problem you point out, but
>> I
>> > > think
>> > > >> >> what
>> > > >> >> > > you
>> > > >> >> > > > are saying in that case is that you want a size limit too.
>> If
>> > > you
>> > > >> use
>> > > >> >> > > > system time you actually hit the same problem: say you do a
>> > > full
>> > > >> dump
>> > > >> >> > of
>> > > >> >> > > a
>> > > >> >> > > > DB table with a setting of 7 days retention, your retention
>> > > will
>> > > >> >> > actually
>> > > >> >> > > > not get enforced for the first 7 days because the data is
>> > "new
>> > > to
>> > > >> >> > Kafka".
>> > > >> >> > > >
>> > > >> >> > > > -Jay
>> > > >> >> > > >
>> > > >> >> > > >
>> > > >> >> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin
>> > > >> >> > <j...@linkedin.com.invalid
>> > > >> >> > > >
>> > > >> >> > > > wrote:
>> > > >> >> > > >
>> > > >> >> > > > > Jay,
>> > > >> >> > > > >
>> > > >> >> > > > > Thanks for the comments. Yes, there are actually three
>> > > >> proposals as
>> > > >> >> > you
>> > > >> >> > > > > pointed out.
>> > > >> >> > > > >
>> > > >> >> > > > > We will have a separate proposal for (1) - version
>> control
>> > > >> >> mechanism.
>> > > >> >> > > We
>> > > >> >> > > > > actually thought about whether we want to separate 2 and
>> 3
>> > > >> >> internally
>> > > >> >> > > > > before creating the KIP. The reason we put 2 and 3
>> together
>> > > is
>> > > >> it
>> > > >> >> > will
>> > > >> >> > > > > saves us another cross board wire protocol change. Like
>> you
>> > > >> said,
>> > > >> >> we
>> > > >> >> > > have
>> > > >> >> > > > > to migrate all the clients in all languages. To some
>> > extent,
>> > > the
>> > > >> >> > effort
>> > > >> >> > > > to
>> > > >> >> > > > > spend on upgrading the clients can be even bigger than
>> > > >> implementing
>> > > >> >> > the
>> > > >> >> > > > new
>> > > >> >> > > > > feature itself. So there are some attractions if we can
>> do
>> > 2
>> > > >> and 3
>> > > >> >> > > > together
>> > > >> >> > > > > instead of separately. Maybe after (1) is done it will be
>> > > >> easier to
>> > > >> >> > do
>> > > >> >> > > > > protocol migration. But if we are able to come to an
>> > > agreement
>> > > >> on
>> > > >> >> the
>> > > >> >> > > > > timestamp solution, I would prefer to have it together
>> with
>> > > >> >> relative
>> > > >> >> > > > offset
>> > > >> >> > > > > in the interest of avoiding another wire protocol change
>> > (the
>> > > >> >> process
>> > > >> >> > > to
>> > > >> >> > > > > migrate to relative offset is exactly the same as migrate
>> > to
>> > > >> >> message
>> > > >> >> > > with
>> > > >> >> > > > > timestamp).
>> > > >> >> > > > >
>> > > >> >> > > > > In terms of timestamp. I completely agree that having
>> > client
>> > > >> >> > timestamp
>> > > >> >> > > is
>> > > >> >> > > > > more useful if we can make sure the timestamp is good.
>> But
>> > in
>> > > >> >> reality
>> > > >> >> > > > that
>> > > >> >> > > > > can be a really big *IF*. I think the problem is exactly
>> as
>> > > Ewen
>> > > >> >> > > > mentioned,
>> > > >> >> > > > > if we let the client to set the timestamp, it would be
>> very
>> > > hard
>> > > >> >> for
>> > > >> >> > > the
>> > > >> >> > > > > broker to utilize it. If broker apply retention policy
>> > based
>> > > on
>> > > >> the
>> > > >> >> > > > client
>> > > >> >> > > > > timestamp. One misbehave producer can potentially
>> > completely
>> > > >> mess
>> > > >> >> up
>> > > >> >> > > the
>> > > >> >> > > > > retention policy on the broker. Although people don't
>> care
>> > > about
>> > > >> >> > server
>> > > >> >> > > > > side timestamp. People do care a lot when timestamp
>> breaks.
>> > > >> >> Searching
>> > > >> >> > > by
>> > > >> >> > > > > timestamp is a really important use case even though it
>> is
>> > > not
>> > > >> used
>> > > >> >> > as
>> > > >> >> > > > > often as searching by offset. It has significant direct
>> > > impact
>> > > >> on
>> > > >> >> RTO
>> > > >> >> > > > when
>> > > >> >> > > > > there is a cross cluster failover as Todd mentioned.
>> > > >> >> > > > >
>> > > >> >> > > > > The trick using max(lastAppendedTimestamp,
>> > currentTimeMillis)
>> > > >> is to
>> > > >> >> > > > > guarantee monotonic increase of the timestamp. Many
>> > > commercial
>> > > >> >> system
>> > > >> >> > > > > actually do something similar to this to solve the time
>> > skew.
>> > > >> About
>> > > >> >> > > > > changing the time, I am not sure if people use NTP like
>> > > using a
>> > > >> >> watch
>> > > >> >> > > to
>> > > >> >> > > > > just set it forward/backward by an hour or so. The time
>> > > >> adjustment
>> > > >> >> I
>> > > >> >> > > used
>> > > >> >> > > > > to do is typically to adjust something like a minute  /
>> > > week. So
>> > > >> >> for
>> > > >> >> > > each
>> > > >> >> > > > > second, there might be a few mircoseconds slower/faster
>> but
>> > > >> should
>> > > >> >> > not
>> > > >> >> > > > > break the clock completely to make sure all the
>> time-based
>> > > >> >> > transactions
>> > > >> >> > > > are
>> > > >> >> > > > > not affected. The one minute change will be done within a
>> > > week
>> > > >> but
>> > > >> >> > not
>> > > >> >> > > > > instantly.
>> > > >> >> > > > >
>> > > >> >> > > > > Personally, I think having client side timestamp will be
>> > > useful
>> > > >> if
>> > > >> >> we
>> > > >> >> > > > don't
>> > > >> >> > > > > need to put the broker and data integrity under risk. If
>> we
>> > > >> have to
>> > > >> >> > > > choose
>> > > >> >> > > > > from one of them but not both. I would prefer server side
>> > > >> timestamp
>> > > >> >> > > > because
>> > > >> >> > > > > for client side timestamp there is always a plan B which
>> is
>> > > >> putting
>> > > >> >> > the
>> > > >> >> > > > > timestamp into payload.
>> > > >> >> > > > >
>> > > >> >> > > > > Another reason I am reluctant to use the client side
>> > > timestamp
>> > > >> is
>> > > >> >> > that
>> > > >> >> > > it
>> > > >> >> > > > > is always dangerous to mix the control plane with data
>> > > plane. IP
>> > > >> >> did
>> > > >> >> > > this
>> > > >> >> > > > > and it has caused so many different breaches so people
>> are
>> > > >> >> migrating
>> > > >> >> > to
>> > > >> >> > > > > something like MPLS. An example in Kafka is that any
>> client
>> > > can
>> > > >> >> > > > construct a
>> > > >> >> > > > >
>> > > >> LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest
>> > > >> >> > > (you
>> > > >> >> > > > > name it) and send it to the broker to mess up the entire
>> > > >> cluster,
>> > > >> >> > also
>> > > >> >> > > as
>> > > >> >> > > > > we already noticed a busy cluster can respond quite slow
>> to
>> > > >> >> > controller
>> > > >> >> > > > > messages. So it would really be nice if we can avoid
>> giving
>> > > the
>> > > >> >> power
>> > > >> >> > > to
>> > > >> >> > > > > clients to control the log retention.
>> > > >> >> > > > >
>> > > >> >> > > > > Thanks,
>> > > >> >> > > > >
>> > > >> >> > > > > Jiangjie (Becket) Qin
>> > > >> >> > > > >
>> > > >> >> > > > >
>> > > >> >> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <
>> > > tpal...@gmail.com>
>> > > >> >> > wrote:
>> > > >> >> > > > >
>> > > >> >> > > > > > So, with regards to why you want to search by
>> timestamp,
>> > > the
>> > > >> >> > biggest
>> > > >> >> > > > > > problem I've seen is with consumers who want to reset
>> > their
>> > > >> >> > > timestamps
>> > > >> >> > > > > to a
>> > > >> >> > > > > > specific point, whether it is to replay a certain
>> amount
>> > of
>> > > >> >> > messages,
>> > > >> >> > > > or
>> > > >> >> > > > > to
>> > > >> >> > > > > > rewind to before some problem state existed. This
>> happens
>> > > more
>> > > >> >> > often
>> > > >> >> > > > than
>> > > >> >> > > > > > anyone would like.
>> > > >> >> > > > > >
>> > > >> >> > > > > > To handle this now we need to constantly export the
>> > > broker's
>> > > >> >> offset
>> > > >> >> > > for
>> > > >> >> > > > > > every partition to a time-series database and then use
>> > > >> external
>> > > >> >> > > > processes
>> > > >> >> > > > > > to query this. I know we're not the only ones doing
>> this.
>> > > The
>> > > >> way
>> > > >> >> > the
>> > > >> >> > > > > > broker handles requests for offsets by timestamp is a
>> > > little
>> > > >> >> obtuse
>> > > >> >> > > > > > (explain it to anyone without intimate knowledge of the
>> > > >> internal
>> > > >> >> > > > workings
>> > > >> >> > > > > > of the broker - every time I do I see this). In
>> addition,
>> > > as
>> > > >> >> Becket
>> > > >> >> > > > > pointed
>> > > >> >> > > > > > out, it causes problems specifically with retention of
>> > > >> messages
>> > > >> >> by
>> > > >> >> > > time
>> > > >> >> > > > > > when you move partitions around.
>> > > >> >> > > > > >
>> > > >> >> > > > > > I'm deliberately avoiding the discussion of what
>> > timestamp
>> > > to
>> > > >> >> use.
>> > > >> >> > I
>> > > >> >> > > > can
>> > > >> >> > > > > > see the argument either way, though I tend to lean
>> > towards
>> > > the
>> > > >> >> idea
>> > > >> >> > > > that
>> > > >> >> > > > > > the broker timestamp is the only viable source of truth
>> > in
>> > > >> this
>> > > >> >> > > > > situation.
>> > > >> >> > > > > >
>> > > >> >> > > > > > -Todd
>> > > >> >> > > > > >
>> > > >> >> > > > > >
>> > > >> >> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava <
>> > > >> >> > > > e...@confluent.io
>> > > >> >> > > > > >
>> > > >> >> > > > > > wrote:
>> > > >> >> > > > > >
>> > > >> >> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <
>> > > j...@confluent.io
>> > > >> >
>> > > >> >> > > wrote:
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > 2. Nobody cares what time it is on the server.
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > This is a good way of summarizing the issue I was
>> > trying
>> > > to
>> > > >> get
>> > > >> >> > at,
>> > > >> >> > > > > from
>> > > >> >> > > > > > an
>> > > >> >> > > > > > > app's perspective. Of the 3 stated goals of the KIP,
>> #2
>> > > (lot
>> > > >> >> > > > retention)
>> > > >> >> > > > > > is
>> > > >> >> > > > > > > reasonably handled by a server-side timestamp. I
>> really
>> > > just
>> > > >> >> care
>> > > >> >> > > > that
>> > > >> >> > > > > a
>> > > >> >> > > > > > > message is there long enough that I have a chance to
>> > > process
>> > > >> >> it.
>> > > >> >> > #3
>> > > >> >> > > > > > > (searching by timestamp) only seems useful if we can
>> > > >> guarantee
>> > > >> >> > the
>> > > >> >> > > > > > > server-side timestamp is close enough to the original
>> > > >> >> client-side
>> > > >> >> > > > > > > timestamp, and any mirror maker step seems to break
>> > that
>> > > >> (even
>> > > >> >> > > > ignoring
>> > > >> >> > > > > > any
>> > > >> >> > > > > > > issues with broker availability).
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > I'm also wondering whether optimizing for
>> > > >> search-by-timestamp
>> > > >> >> on
>> > > >> >> > > the
>> > > >> >> > > > > > broker
>> > > >> >> > > > > > > is really something we want to do given that messages
>> > > aren't
>> > > >> >> > really
>> > > >> >> > > > > > > guaranteed to be ordered by application-level
>> > timestamps
>> > > on
>> > > >> the
>> > > >> >> > > > broker.
>> > > >> >> > > > > > Is
>> > > >> >> > > > > > > part of the need for this just due to the current
>> > > consumer
>> > > >> APIs
>> > > >> >> > > being
>> > > >> >> > > > > > > difficult to work with? For example, could you
>> > implement
>> > > >> this
>> > > >> >> > > pretty
>> > > >> >> > > > > > easily
>> > > >> >> > > > > > > client side just the way you would broker-side? I'd
>> > > imagine
>> > > >> a
>> > > >> >> > > couple
>> > > >> >> > > > of
>> > > >> >> > > > > > > random seeks + reads during very rare occasions (i.e.
>> > > when
>> > > >> the
>> > > >> >> > app
>> > > >> >> > > > > starts
>> > > >> >> > > > > > > up) wouldn't be a problem performance-wise. Or is it
>> > also
>> > > >> that
>> > > >> >> > you
>> > > >> >> > > > need
>> > > >> >> > > > > > the
>> > > >> >> > > > > > > broker to enforce things like monotonically
>> increasing
>> > > >> >> timestamps
>> > > >> >> > > > since
>> > > >> >> > > > > > you
>> > > >> >> > > > > > > can't do the query properly and efficiently without
>> > that
>> > > >> >> > guarantee,
>> > > >> >> > > > and
>> > > >> >> > > > > > > therefore what applications are actually looking for
>> > *is*
>> > > >> >> > > broker-side
>> > > >> >> > > > > > > timestamps?
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > -Ewen
>> > > >> >> > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > > Consider cases where data is being copied from a
>> > > database
>> > > >> or
>> > > >> >> > from
>> > > >> >> > > > log
>> > > >> >> > > > > > > > files. In steady-state the server time is very
>> close
>> > to
>> > > >> the
>> > > >> >> > > client
>> > > >> >> > > > > time
>> > > >> >> > > > > > > if
>> > > >> >> > > > > > > > their clocks are sync'd (see 1) but there will be
>> > > times of
>> > > >> >> > large
>> > > >> >> > > > > > > divergence
>> > > >> >> > > > > > > > when the copying process is stopped or falls
>> behind.
>> > > When
>> > > >> >> this
>> > > >> >> > > > occurs
>> > > >> >> > > > > > it
>> > > >> >> > > > > > > is
>> > > >> >> > > > > > > > clear that the time the data arrived on the server
>> is
>> > > >> >> > irrelevant,
>> > > >> >> > > > it
>> > > >> >> > > > > is
>> > > >> >> > > > > > > the
>> > > >> >> > > > > > > > source timestamp that matters. This is the problem
>> > you
>> > > are
>> > > >> >> > trying
>> > > >> >> > > > to
>> > > >> >> > > > > > fix
>> > > >> >> > > > > > > by
>> > > >> >> > > > > > > > retaining the mm timestamp but really the client
>> > should
>> > > >> >> always
>> > > >> >> > > set
>> > > >> >> > > > > the
>> > > >> >> > > > > > > time
>> > > >> >> > > > > > > > with the use of server-side time as a fallback. It
>> > > would
>> > > >> be
>> > > >> >> > worth
>> > > >> >> > > > > > talking
>> > > >> >> > > > > > > > to the Samza folks and reading through this blog
>> > post (
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > >
>> > > >> >> > > > >
>> > > >> >> > > >
>> > > >> >> > >
>> > > >> >> >
>> > > >> >>
>> > > >>
>> > >
>> >
>> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
>> > > >> >> > > > > > > > )
>> > > >> >> > > > > > > > on this subject since we went through similar
>> > > learnings on
>> > > >> >> the
>> > > >> >> > > > stream
>> > > >> >> > > > > > > > processing side.
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > I think the implication of these two is that we
>> need
>> > a
>> > > >> >> proposal
>> > > >> >> > > > that
>> > > >> >> > > > > > > > handles potentially very out-of-order timestamps in
>> > > some
>> > > >> kind
>> > > >> >> > of
>> > > >> >> > > > > sanish
>> > > >> >> > > > > > > way
>> > > >> >> > > > > > > > (buggy clients will set something totally wrong as
>> > the
>> > > >> time).
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > -Jay
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <
>> > > >> j...@confluent.io>
>> > > >> >> > > > wrote:
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > > The magic byte is used to version message format
>> so
>> > > >> we'll
>> > > >> >> > need
>> > > >> >> > > to
>> > > >> >> > > > > > make
>> > > >> >> > > > > > > > > sure that check is in place--I actually don't see
>> > it
>> > > in
>> > > >> the
>> > > >> >> > > > current
>> > > >> >> > > > > > > > > consumer code which I think is a bug we should
>> fix
>> > > for
>> > > >> the
>> > > >> >> > next
>> > > >> >> > > > > > release
>> > > >> >> > > > > > > > > (filed KAFKA-2523). The purpose of that field is
>> so
>> > > >> there
>> > > >> >> is
>> > > >> >> > a
>> > > >> >> > > > > clear
>> > > >> >> > > > > > > > check
>> > > >> >> > > > > > > > > on the format rather than the scrambled scenarios
>> > > Becket
>> > > >> >> > > > describes.
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > > > Also, Becket, I don't think just fixing the java
>> > > client
>> > > >> is
>> > > >> >> > > > > sufficient
>> > > >> >> > > > > > > as
>> > > >> >> > > > > > > > > that would break other clients--i.e. if anyone
>> > > writes a
>> > > >> v1
>> > > >> >> > > > > messages,
>> > > >> >> > > > > > > even
>> > > >> >> > > > > > > > > by accident, any non-v1-capable consumer will
>> > break.
>> > > I
>> > > >> >> think
>> > > >> >> > we
>> > > >> >> > > > > > > probably
>> > > >> >> > > > > > > > > need a way to have the server ensure a particular
>> > > >> message
>> > > >> >> > > format
>> > > >> >> > > > > > either
>> > > >> >> > > > > > > > at
>> > > >> >> > > > > > > > > read or write time.
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > > > -Jay
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin
>> > > >> >> > > > > > <j...@linkedin.com.invalid
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > > > > wrote:
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > > >> Hi Guozhang,
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >> I checked the code again. Actually CRC check
>> > > probably
>> > > >> >> won't
>> > > >> >> > > > fail.
>> > > >> >> > > > > > The
>> > > >> >> > > > > > > > >> newly
>> > > >> >> > > > > > > > >> added timestamp field might be treated as
>> > keyLength
>> > > >> >> instead,
>> > > >> >> > > so
>> > > >> >> > > > we
>> > > >> >> > > > > > are
>> > > >> >> > > > > > > > >> likely to receive an IllegalArgumentException
>> when
>> > > try
>> > > >> to
>> > > >> >> > read
>> > > >> >> > > > the
>> > > >> >> > > > > > > key.
>> > > >> >> > > > > > > > >> I'll update the KIP.
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >> Thanks,
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >> Jiangjie (Becket) Qin
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <
>> > > >> >> > > > j...@linkedin.com>
>> > > >> >> > > > > > > > wrote:
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >> > Hi, Guozhang,
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> > Thanks for reading the KIP. By "old
>> consumer", I
>> > > >> meant
>> > > >> >> the
>> > > >> >> > > > > > > > >> > ZookeeperConsumerConnector in trunk now, i.e.
>> > > without
>> > > >> >> this
>> > > >> >> > > bug
>> > > >> >> > > > > > > fixed.
>> > > >> >> > > > > > > > >> If we
>> > > >> >> > > > > > > > >> > fix the ZookeeperConsumerConnector then it
>> will
>> > > throw
>> > > >> >> > > > exception
>> > > >> >> > > > > > > > >> complaining
>> > > >> >> > > > > > > > >> > about the unsupported version when it sees
>> > message
>> > > >> >> format
>> > > >> >> > > V1.
>> > > >> >> > > > > > What I
>> > > >> >> > > > > > > > was
>> > > >> >> > > > > > > > >> > trying to say is that if we have some
>> > > >> >> > > > ZookeeperConsumerConnector
>> > > >> >> > > > > > > > running
>> > > >> >> > > > > > > > >> > without the fix, the consumer will complain
>> > about
>> > > CRC
>> > > >> >> > > mismatch
>> > > >> >> > > > > > > instead
>> > > >> >> > > > > > > > >> of
>> > > >> >> > > > > > > > >> > unsupported version.
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> > Thanks,
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> > Jiangjie (Becket) Qin
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang
>> Wang <
>> > > >> >> > > > > > wangg...@gmail.com>
>> > > >> >> > > > > > > > >> wrote:
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> >> Thanks for the write-up Jiangjie.
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> One comment about migration plan: "For old
>> > > >> consumers,
>> > > >> >> if
>> > > >> >> > > they
>> > > >> >> > > > > see
>> > > >> >> > > > > > > the
>> > > >> >> > > > > > > > >> new
>> > > >> >> > > > > > > > >> >> protocol the CRC check will fail"..
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> Do you mean this bug in the old consumer
>> cannot
>> > > be
>> > > >> >> fixed
>> > > >> >> > > in a
>> > > >> >> > > > > > > > >> >> backward-compatible way?
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> Guozhang
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
>> > > >> >> > > > > > > > <j...@linkedin.com.invalid
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> >> wrote:
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> > Hi,
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> > We just created KIP-31 to propose a message
>> > > format
>> > > >> >> > change
>> > > >> >> > > > in
>> > > >> >> > > > > > > Kafka.
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > >
>> > > >> >> > > > >
>> > > >> >> > > >
>> > > >> >> > >
>> > > >> >> >
>> > > >> >>
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> > As a summary, the motivations are:
>> > > >> >> > > > > > > > >> >> > 1. Avoid server side message re-compression
>> > > >> >> > > > > > > > >> >> > 2. Honor time-based log roll and retention
>> > > >> >> > > > > > > > >> >> > 3. Enable offset search by timestamp at a
>> > finer
>> > > >> >> > > > granularity.
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> > Feedback and comments are welcome!
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> > Thanks,
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >> > Jiangjie (Becket) Qin
>> > > >> >> > > > > > > > >> >> >
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >> --
>> > > >> >> > > > > > > > >> >> -- Guozhang
>> > > >> >> > > > > > > > >> >>
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >> >
>> > > >> >> > > > > > > > >>
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > > >
>> > > >> >> > > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > >
>> > > >> >> > > > > > > --
>> > > >> >> > > > > > > Thanks,
>> > > >> >> > > > > > > Ewen
>> > > >> >> > > > > > >
>> > > >> >> > > > > >
>> > > >> >> > > > >
>> > > >> >> > > >
>> > > >> >> > >
>> > > >> >> > >
>> > > >> >> > >
>> > > >> >> > > --
>> > > >> >> > > Thanks,
>> > > >> >> > > Neha
>> > > >> >> > >
>> > > >> >> >
>> > > >> >>
>> > > >>
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>

Reply via email to