Hi Jay,

Thanks for the comments. Replied inline.


On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> I need to take more time to think about this. Here are a few off-the-cuff
> remarks:
> - To date we have tried really, really hard to keep the data model for
> message simple since after all you can always add whatever you like inside
> the message body.
> - For system tags, why not just make these fields first class fields in
> message? The purpose of a system tag is presumably that Why have a bunch of
> key-value pairs versus first-class fields?

Yes, we can alternatively make system tags as first class fields in the
message header to make the format / processing logic simpler.

The main reasons I put them as systems tags are 1) when I think about these
possible system tags, some of them are for all types of messages (e.g.
timestamps), but some of them may be for a specific type of message
(compressed, control message) and for those not all of them are necessarily
required all the time, hence making them as compact tags may save us some
space when not all of them are available; 2) with tags we do not need to
bump up the protocol version every time we make a change to it, which
includes keeping the logic to handle all versions on the broker until the
old ones are officially discarded; instead, the broker can just ignore a
tag if its id is not recognizable since the client is on a newer version,
or use some default value / throw exception if a required tag is missing
since the client is on an older version.

> - You don't necessarily need application-level tags explicitly represented
> in the message format for efficiency. The application can define their own
> header (e.g. their message could be a size delimited header followed by a
> size delimited body). But actually if you use Avro you don't even need this
> I don't think. Avro has the ability to just deserialize the "header" fields
> in your message. Avro has a notion of reader and writer schemas. The writer
> schema is whatever the message was written with. If the reader schema is
> just the header, avro will skip any fields it doesn't need and just
> deserialize the fields it does need. This is actually a much more usable
> and flexible way to define a header since you get all the types avro allows
> instead of just bytes.

I agree that we can use a reader schema to just read out the header without
de-serializing the full message, and probably for compressed message we can
add an Avro / etc header for the compressed wrapper message also, but that
would enforce these applications (MM, auditor, clients) to be schema-aware,
which would usually require the people who manage this data pipeline also
manage the schemas, whereas ideally Kafka itself should just consider
bytes-in and bytes-out (and maybe a little bit more, like timestamps). The
purpose here is to not introduce an extra dependency while at the same time
allow applications to not fully de-serialize / de-compress the message in
order to do some simple processing based on metadata only.

> - We will need to think carefully about what to do with timestamps if we
> end up including them. There are actually several timestamps
>   - The time the producer created the message
>   - The time the leader received the message
>   - The time the current broker received the message
> The producer timestamps won't be at all increasing. The leader timestamp
> will be mostly increasing except when the clock changes or leadership
> moves. This somewhat complicates the use of these timestamps, though. From
> the point of view of the producer the only time that matters is the time
> the message was created. However since the producer sets this it can be
> arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
> get). Say that the heuristic was to use the timestamp of the first message
> in a file for retention, the problem would be that the timestamps for the
> segments need not even be sequential and a single bad producer could send
> data with time in the distant past or future causing data to be deleted or
> retained forever. Using the broker timestamp at write time is better,
> though obvious that would be overwritten when data is mirrored between
> clusters (the mirror would then have a different time--and if the mirroring
> ever stopped that gap could be large). One approach would be to use the
> client timestamp but have the broker overwrite it if it is too bad (e.g.
> off by more than a minute, say).

We would need the reception timestamp (i.e. the third one) for log
cleaning, and as for the first / second ones, I originally put them as app
tags since they are likely to be used not by the brokers itself (e.g.
auditor, etc).

> -Jay
> On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> > Thanks Guozhang! This is an excellent write-up and the approach nicely
> > consolidates a number of long-standing issues. It would be great if
> > everyone can review this carefully and give feedback.
> >
> > Also, wrt discussion in the past we have used a mix of wiki comments
> > and the mailing list. Personally, I think it is better to discuss on
> > the mailing list (for more visibility) and just post a bold link to
> > the (archived) mailing list thread on the wiki.
> >
> > Joel
> >
> > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> > > Hello all,
> > >
> > > I put some thoughts on enhancing our current message metadata format to
> > > solve a bunch of existing issues:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > >
> > > This wiki page is for kicking off some discussions about the
> feasibility
> > of
> > > adding more info into the message header, and if possible how we would
> > add
> > > them.
> > >
> > > -- Guozhang
> >
> >

-- Guozhang

Reply via email to