Becket,

Nice write-up. Few thoughts -

I'd split up the discussion for simplicity. Note that you can always group
several of these in one patch to reduce the protocol changes people have to
deal with.This is just a suggestion, but I think the following split might
make it easier to tackle the changes being proposed -

   - Relative offsets
   - Introducing the concept of time
   - Time-based indexing (separate the usage of the timestamp field from
   how/whether we want to include a timestamp in the message)

I'm a +1 on relative offsets, we should've done it back when we introduced
it. Other than reducing the CPU overhead, this will also reduce the garbage
collection overhead on the brokers.

On the timestamp field, I generally agree that we should add a timestamp to
a Kafka message but I'm not quite sold on how this KIP suggests the
timestamp be set. Will avoid repeating the downsides of a broker side
timestamp mentioned previously in this thread. I think the topic of
including a timestamp in a Kafka message requires a lot more thought and
details than what's in this KIP. I'd suggest we make it a separate KIP that
includes a list of all the different use cases for the timestamp (beyond
log retention) including stream processing and discuss tradeoffs of
including client and broker side timestamps.

Agree with the benefit of time-based indexing, but haven't had a chance to
dive into the design details yet.

Thanks,
Neha

On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Beckett,
>
> I was proposing splitting up the KIP just for simplicity of discussion. You
> can still implement them in one patch. I think otherwise it will be hard to
> discuss/vote on them since if you like the offset proposal but not the time
> proposal what do you do?
>
> Introducing a second notion of time into Kafka is a pretty massive
> philosophical change so it kind of warrants it's own KIP I think it isn't
> just "Change message format".
>
> WRT time I think one thing to clarify in the proposal is how MM will have
> access to set the timestamp? Presumably this will be a new field in
> ProducerRecord, right? If so then any user can set the timestamp, right?
> I'm not sure you answered the questions around how this will work for MM
> since when MM retains timestamps from multiple partitions they will then be
> out of order and in the past (so the max(lastAppendedTimestamp,
> currentTimeMillis) override you proposed will not work, right?). If we
> don't do this then when you set up mirroring the data will all be new and
> you have the same retention problem you described. Maybe I missed
> something...?
>
> My main motivation is that given that both Samza and Kafka streams are
> doing work that implies a mandatory client-defined notion of time, I really
> think introducing a different mandatory notion of time in Kafka is going to
> be quite odd. We should think hard about how client-defined time could
> work. I'm not sure if it can, but I'm also not sure that it can't. Having
> both will be odd. Did you chat about this with Yi/Kartik on the Samza side?
>
> When you are saying it won't work you are assuming some particular
> implementation? Maybe that the index is a monotonically increasing set of
> pointers to the least record with a timestamp larger than the index time?
> In other words a search for time X gives the largest offset at which all
> records are <= X?
>
> For retention, I agree with the problem you point out, but I think what you
> are saying in that case is that you want a size limit too. If you use
> system time you actually hit the same problem: say you do a full dump of a
> DB table with a setting of 7 days retention, your retention will actually
> not get enforced for the first 7 days because the data is "new to Kafka".
>
> -Jay
>
>
> On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Jay,
> >
> > Thanks for the comments. Yes, there are actually three proposals as you
> > pointed out.
> >
> > We will have a separate proposal for (1) - version control mechanism. We
> > actually thought about whether we want to separate 2 and 3 internally
> > before creating the KIP. The reason we put 2 and 3 together is it will
> > saves us another cross board wire protocol change. Like you said, we have
> > to migrate all the clients in all languages. To some extent, the effort
> to
> > spend on upgrading the clients can be even bigger than implementing the
> new
> > feature itself. So there are some attractions if we can do 2 and 3
> together
> > instead of separately. Maybe after (1) is done it will be easier to do
> > protocol migration. But if we are able to come to an agreement on the
> > timestamp solution, I would prefer to have it together with relative
> offset
> > in the interest of avoiding another wire protocol change (the process to
> > migrate to relative offset is exactly the same as migrate to message with
> > timestamp).
> >
> > In terms of timestamp. I completely agree that having client timestamp is
> > more useful if we can make sure the timestamp is good. But in reality
> that
> > can be a really big *IF*. I think the problem is exactly as Ewen
> mentioned,
> > if we let the client to set the timestamp, it would be very hard for the
> > broker to utilize it. If broker apply retention policy based on the
> client
> > timestamp. One misbehave producer can potentially completely mess up the
> > retention policy on the broker. Although people don't care about server
> > side timestamp. People do care a lot when timestamp breaks. Searching by
> > timestamp is a really important use case even though it is not used as
> > often as searching by offset. It has significant direct impact on RTO
> when
> > there is a cross cluster failover as Todd mentioned.
> >
> > The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
> > guarantee monotonic increase of the timestamp. Many commercial system
> > actually do something similar to this to solve the time skew. About
> > changing the time, I am not sure if people use NTP like using a watch to
> > just set it forward/backward by an hour or so. The time adjustment I used
> > to do is typically to adjust something like a minute  / week. So for each
> > second, there might be a few mircoseconds slower/faster but should not
> > break the clock completely to make sure all the time-based transactions
> are
> > not affected. The one minute change will be done within a week but not
> > instantly.
> >
> > Personally, I think having client side timestamp will be useful if we
> don't
> > need to put the broker and data integrity under risk. If we have to
> choose
> > from one of them but not both. I would prefer server side timestamp
> because
> > for client side timestamp there is always a plan B which is putting the
> > timestamp into payload.
> >
> > Another reason I am reluctant to use the client side timestamp is that it
> > is always dangerous to mix the control plane with data plane. IP did this
> > and it has caused so many different breaches so people are migrating to
> > something like MPLS. An example in Kafka is that any client can
> construct a
> > LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest (you
> > name it) and send it to the broker to mess up the entire cluster, also as
> > we already noticed a busy cluster can respond quite slow to controller
> > messages. So it would really be nice if we can avoid giving the power to
> > clients to control the log retention.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com> wrote:
> >
> > > So, with regards to why you want to search by timestamp, the biggest
> > > problem I've seen is with consumers who want to reset their timestamps
> > to a
> > > specific point, whether it is to replay a certain amount of messages,
> or
> > to
> > > rewind to before some problem state existed. This happens more often
> than
> > > anyone would like.
> > >
> > > To handle this now we need to constantly export the broker's offset for
> > > every partition to a time-series database and then use external
> processes
> > > to query this. I know we're not the only ones doing this. The way the
> > > broker handles requests for offsets by timestamp is a little obtuse
> > > (explain it to anyone without intimate knowledge of the internal
> workings
> > > of the broker - every time I do I see this). In addition, as Becket
> > pointed
> > > out, it causes problems specifically with retention of messages by time
> > > when you move partitions around.
> > >
> > > I'm deliberately avoiding the discussion of what timestamp to use. I
> can
> > > see the argument either way, though I tend to lean towards the idea
> that
> > > the broker timestamp is the only viable source of truth in this
> > situation.
> > >
> > > -Todd
> > >
> > >
> > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > > wrote:
> > >
> > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > >
> > > > > 2. Nobody cares what time it is on the server.
> > > > >
> > > >
> > > > This is a good way of summarizing the issue I was trying to get at,
> > from
> > > an
> > > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot
> retention)
> > > is
> > > > reasonably handled by a server-side timestamp. I really just care
> that
> > a
> > > > message is there long enough that I have a chance to process it. #3
> > > > (searching by timestamp) only seems useful if we can guarantee the
> > > > server-side timestamp is close enough to the original client-side
> > > > timestamp, and any mirror maker step seems to break that (even
> ignoring
> > > any
> > > > issues with broker availability).
> > > >
> > > > I'm also wondering whether optimizing for search-by-timestamp on the
> > > broker
> > > > is really something we want to do given that messages aren't really
> > > > guaranteed to be ordered by application-level timestamps on the
> broker.
> > > Is
> > > > part of the need for this just due to the current consumer APIs being
> > > > difficult to work with? For example, could you implement this pretty
> > > easily
> > > > client side just the way you would broker-side? I'd imagine a couple
> of
> > > > random seeks + reads during very rare occasions (i.e. when the app
> > starts
> > > > up) wouldn't be a problem performance-wise. Or is it also that you
> need
> > > the
> > > > broker to enforce things like monotonically increasing timestamps
> since
> > > you
> > > > can't do the query properly and efficiently without that guarantee,
> and
> > > > therefore what applications are actually looking for *is* broker-side
> > > > timestamps?
> > > >
> > > > -Ewen
> > > >
> > > >
> > > >
> > > > > Consider cases where data is being copied from a database or from
> log
> > > > > files. In steady-state the server time is very close to the client
> > time
> > > > if
> > > > > their clocks are sync'd (see 1) but there will be times of large
> > > > divergence
> > > > > when the copying process is stopped or falls behind. When this
> occurs
> > > it
> > > > is
> > > > > clear that the time the data arrived on the server is irrelevant,
> it
> > is
> > > > the
> > > > > source timestamp that matters. This is the problem you are trying
> to
> > > fix
> > > > by
> > > > > retaining the mm timestamp but really the client should always set
> > the
> > > > time
> > > > > with the use of server-side time as a fallback. It would be worth
> > > talking
> > > > > to the Samza folks and reading through this blog post (
> > > > >
> > > >
> > >
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > > > > )
> > > > > on this subject since we went through similar learnings on the
> stream
> > > > > processing side.
> > > > >
> > > > > I think the implication of these two is that we need a proposal
> that
> > > > > handles potentially very out-of-order timestamps in some kind of
> > sanish
> > > > way
> > > > > (buggy clients will set something totally wrong as the time).
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io>
> wrote:
> > > > >
> > > > > > The magic byte is used to version message format so we'll need to
> > > make
> > > > > > sure that check is in place--I actually don't see it in the
> current
> > > > > > consumer code which I think is a bug we should fix for the next
> > > release
> > > > > > (filed KAFKA-2523). The purpose of that field is so there is a
> > clear
> > > > > check
> > > > > > on the format rather than the scrambled scenarios Becket
> describes.
> > > > > >
> > > > > > Also, Becket, I don't think just fixing the java client is
> > sufficient
> > > > as
> > > > > > that would break other clients--i.e. if anyone writes a v1
> > messages,
> > > > even
> > > > > > by accident, any non-v1-capable consumer will break. I think we
> > > > probably
> > > > > > need a way to have the server ensure a particular message format
> > > either
> > > > > at
> > > > > > read or write time.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin
> > > <j...@linkedin.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Guozhang,
> > > > > >>
> > > > > >> I checked the code again. Actually CRC check probably won't
> fail.
> > > The
> > > > > >> newly
> > > > > >> added timestamp field might be treated as keyLength instead, so
> we
> > > are
> > > > > >> likely to receive an IllegalArgumentException when try to read
> the
> > > > key.
> > > > > >> I'll update the KIP.
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jiangjie (Becket) Qin
> > > > > >>
> > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <
> j...@linkedin.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi, Guozhang,
> > > > > >> >
> > > > > >> > Thanks for reading the KIP. By "old consumer", I meant the
> > > > > >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug
> > > > fixed.
> > > > > >> If we
> > > > > >> > fix the ZookeeperConsumerConnector then it will throw
> exception
> > > > > >> complaining
> > > > > >> > about the unsupported version when it sees message format V1.
> > > What I
> > > > > was
> > > > > >> > trying to say is that if we have some
> ZookeeperConsumerConnector
> > > > > running
> > > > > >> > without the fix, the consumer will complain about CRC mismatch
> > > > instead
> > > > > >> of
> > > > > >> > unsupported version.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> >
> > > > > >> > Jiangjie (Becket) Qin
> > > > > >> >
> > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Thanks for the write-up Jiangjie.
> > > > > >> >>
> > > > > >> >> One comment about migration plan: "For old consumers, if they
> > see
> > > > the
> > > > > >> new
> > > > > >> >> protocol the CRC check will fail"..
> > > > > >> >>
> > > > > >> >> Do you mean this bug in the old consumer cannot be fixed in a
> > > > > >> >> backward-compatible way?
> > > > > >> >>
> > > > > >> >> Guozhang
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
> > > > > <j...@linkedin.com.invalid
> > > > > >> >
> > > > > >> >> wrote:
> > > > > >> >>
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > We just created KIP-31 to propose a message format change
> in
> > > > Kafka.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> > > > > >> >> >
> > > > > >> >> > As a summary, the motivations are:
> > > > > >> >> > 1. Avoid server side message re-compression
> > > > > >> >> > 2. Honor time-based log roll and retention
> > > > > >> >> > 3. Enable offset search by timestamp at a finer
> granularity.
> > > > > >> >> >
> > > > > >> >> > Feedback and comments are welcome!
> > > > > >> >> >
> > > > > >> >> > Thanks,
> > > > > >> >> >
> > > > > >> >> > Jiangjie (Becket) Qin
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> --
> > > > > >> >> -- Guozhang
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> >
>



-- 
Thanks,
Neha

Reply via email to