[DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Jiangjie Qin
Hi,

We just created KIP-31 to propose a message format change in Kafka.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal

As a summary, the motivations are:
1. Avoid server side message re-compression
2. Honor time-based log roll and retention
3. Enable offset search by timestamp at a finer granularity.

Feedback and comments are welcome!

Thanks,

Jiangjie (Becket) Qin


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Guozhang Wang
Thanks for the write-up Jiangjie.

One comment about migration plan: "For old consumers, if they see the new
protocol the CRC check will fail"..

Do you mean this bug in the old consumer cannot be fixed in a
backward-compatible way?

Guozhang


On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin 
wrote:

> Hi,
>
> We just created KIP-31 to propose a message format change in Kafka.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
>
> As a summary, the motivations are:
> 1. Avoid server side message re-compression
> 2. Honor time-based log roll and retention
> 3. Enable offset search by timestamp at a finer granularity.
>
> Feedback and comments are welcome!
>
> Thanks,
>
> Jiangjie (Becket) Qin
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Jiangjie Qin
Hi, Guozhang,

Thanks for reading the KIP. By "old consumer", I meant the
ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If we
fix the ZookeeperConsumerConnector then it will throw exception complaining
about the unsupported version when it sees message format V1. What I was
trying to say is that if we have some ZookeeperConsumerConnector running
without the fix, the consumer will complain about CRC mismatch instead of
unsupported version.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang  wrote:

> Thanks for the write-up Jiangjie.
>
> One comment about migration plan: "For old consumers, if they see the new
> protocol the CRC check will fail"..
>
> Do you mean this bug in the old consumer cannot be fixed in a
> backward-compatible way?
>
> Guozhang
>
>
> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin 
> wrote:
>
> > Hi,
> >
> > We just created KIP-31 to propose a message format change in Kafka.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> >
> > As a summary, the motivations are:
> > 1. Avoid server side message re-compression
> > 2. Honor time-based log roll and retention
> > 3. Enable offset search by timestamp at a finer granularity.
> >
> > Feedback and comments are welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Jiangjie Qin
Hi Guozhang,

I checked the code again. Actually CRC check probably won't fail. The newly
added timestamp field might be treated as keyLength instead, so we are
likely to receive an IllegalArgumentException when try to read the key.
I'll update the KIP.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin  wrote:

> Hi, Guozhang,
>
> Thanks for reading the KIP. By "old consumer", I meant the
> ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If we
> fix the ZookeeperConsumerConnector then it will throw exception complaining
> about the unsupported version when it sees message format V1. What I was
> trying to say is that if we have some ZookeeperConsumerConnector running
> without the fix, the consumer will complain about CRC mismatch instead of
> unsupported version.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang  wrote:
>
>> Thanks for the write-up Jiangjie.
>>
>> One comment about migration plan: "For old consumers, if they see the new
>> protocol the CRC check will fail"..
>>
>> Do you mean this bug in the old consumer cannot be fixed in a
>> backward-compatible way?
>>
>> Guozhang
>>
>>
>> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin 
>> wrote:
>>
>> > Hi,
>> >
>> > We just created KIP-31 to propose a message format change in Kafka.
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
>> >
>> > As a summary, the motivations are:
>> > 1. Avoid server side message re-compression
>> > 2. Honor time-based log roll and retention
>> > 3. Enable offset search by timestamp at a finer granularity.
>> >
>> > Feedback and comments are welcome!
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Ewen Cheslack-Postava
A few questions:

1. If we update the producers to only support V1, doesn't that mean people
get stuck on the current version of the producer until they can be sure all
their consumers have been upgraded? Is that going to be a problem for
anyone, and does it potentially keep important fixes/enhancements (e.g. the
upcoming client-side network request timeouts) because they have to wait
for all the consumers to upgrade first?

2. Why minute granularity specifically? That's seems fairly specific to
LI's example workload (which, by the way, is a very useful reference point
to include in the KIP, thanks for that). If someone has the memory and can
support it, why not let them go all the way to per-record timestamps (by
making the interval configurable and they can set it really small/to 0)? It
seems to me that some people might be willing to pay that cost for the
application-side simplicity if they want to consume from specific
timestamps. With the proposal as is, seeking to a timestamp still requires
client-side logic to filter out messages that might have earlier timestamps
due to the granularity chosen. The obvious tradeoff here is yet another
config option -- I'm loath to add yet more configs, although this is one
that we can choose a reasonable default for (like 1 minute) and people can
easily change with no impact if they need to adjust it.

3. Why not respect the timestamp passed in as long as it's not some
sentinel value that indicates the broker should fill it in? When you do
know they will be ordered, as they should be with mirror maker (which is
specifically mentioned), this seems really useful. (More about this in
questions below...)

4. I think one obvious answer to (3) is that accepting client's timestamps
could break seeking-by-timestamp since they may not be properly ordered.
However, I think this can break anyway under normal operations -- any
imperfections in clock synchronization could result in older timestamps
being applied to new messages on a new leader broker compared messages
stored previously by the old leader. I think it's probably too common to
just think that NTP will take care of it and then run into weird bugs
because NTP isn't always perfect, sometimes does some unexpected things,
and, of course, does what you tell it to and so is subject to user error.

It would be reasonable to argue that the leader change issue is much less
likely of an issue than if you respect timestamps from producers, where, if
applications actually filled it in, you'd receive a jumbled mess of
timestamps and trying to do the binary search in the index wouldn't
necessarily give you correct results. However, a) we could allow clients to
fill that info in but discourage it where it might cause issues (i.e.
normal applications) and it seems like a significant win for mirrormaker.

I actually think not accepting timestamps is probably the better choice but
a) it seems awkward in the protocol spec because we have to include the
field in the produce requests since we don't want to have to fully decode
them on the broker and b) losing that info during mirroring seems like it
breaks the goal of fixing log retention (at least for mirrors) as well as
the goal of improving searching by timestamp (at least for mirrors).

5. You never actually specified the granularity (or format, but I assume
unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This
definitely needs to eventually make it into the protocol docs.

6. Re: the rejected alternative. Are there any other options in changing
the config format that might make it a bit lighter weight? For example, do
we need a full int64? Could we do something relative instead that wouldn't
require as many bytes? Without (5) being specified, it's actually difficult
to evaluate some of these options.

7. Any plan to expose this in the client APIs? This is related to (4). If
they are not exposed anywhere in the API as being associated with messages,
then we can reasonably treat them as an internal implementation detail and
be very clear about what looking up an offset for a timestamp means. If
they are exposed, then they're more like message metadata that applications
are probably going to want preserved, and thus will want the broker to
respect the timestamp. For example, if I saw that consumers could get the
timestamp, I'd want to be able to assign it at the producer so that even if
I have significant batching I still get an accurate timestamp -- basically
I might replace some cases where I use a timestamp embedded in the message
with the timestamp provided by Kafka. (That said, we should consider the
impact that including it in the protocol can have; non-Java clients are
likely to expose it if it is available, whether it's actually a good idea
to or not.)

-Ewen


On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin 
wrote:

> Hi Guozhang,
>
> I checked the code again. Actually CRC check probably won't fail. The newly
> added timestamp field might be treated as keyLe

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Ewen Cheslack-Postava
Heh, I guess in addition to my wall of text of questions, I should also say
that I think this provides useful functionality and fixes issues that we've
seen a bunch of questions and complaints about, so I'm in favor of a fix
and this looks like a pretty good approach :)

It might also be useful to say which release you're hoping to get this into.

-Ewen

On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava 
wrote:

> A few questions:
>
> 1. If we update the producers to only support V1, doesn't that mean people
> get stuck on the current version of the producer until they can be sure all
> their consumers have been upgraded? Is that going to be a problem for
> anyone, and does it potentially keep important fixes/enhancements (e.g. the
> upcoming client-side network request timeouts) because they have to wait
> for all the consumers to upgrade first?
>
> 2. Why minute granularity specifically? That's seems fairly specific to
> LI's example workload (which, by the way, is a very useful reference point
> to include in the KIP, thanks for that). If someone has the memory and can
> support it, why not let them go all the way to per-record timestamps (by
> making the interval configurable and they can set it really small/to 0)? It
> seems to me that some people might be willing to pay that cost for the
> application-side simplicity if they want to consume from specific
> timestamps. With the proposal as is, seeking to a timestamp still requires
> client-side logic to filter out messages that might have earlier timestamps
> due to the granularity chosen. The obvious tradeoff here is yet another
> config option -- I'm loath to add yet more configs, although this is one
> that we can choose a reasonable default for (like 1 minute) and people can
> easily change with no impact if they need to adjust it.
>
> 3. Why not respect the timestamp passed in as long as it's not some
> sentinel value that indicates the broker should fill it in? When you do
> know they will be ordered, as they should be with mirror maker (which is
> specifically mentioned), this seems really useful. (More about this in
> questions below...)
>
> 4. I think one obvious answer to (3) is that accepting client's timestamps
> could break seeking-by-timestamp since they may not be properly ordered.
> However, I think this can break anyway under normal operations -- any
> imperfections in clock synchronization could result in older timestamps
> being applied to new messages on a new leader broker compared messages
> stored previously by the old leader. I think it's probably too common to
> just think that NTP will take care of it and then run into weird bugs
> because NTP isn't always perfect, sometimes does some unexpected things,
> and, of course, does what you tell it to and so is subject to user error.
>
> It would be reasonable to argue that the leader change issue is much less
> likely of an issue than if you respect timestamps from producers, where, if
> applications actually filled it in, you'd receive a jumbled mess of
> timestamps and trying to do the binary search in the index wouldn't
> necessarily give you correct results. However, a) we could allow clients to
> fill that info in but discourage it where it might cause issues (i.e.
> normal applications) and it seems like a significant win for mirrormaker.
>
> I actually think not accepting timestamps is probably the better choice
> but a) it seems awkward in the protocol spec because we have to include the
> field in the produce requests since we don't want to have to fully decode
> them on the broker and b) losing that info during mirroring seems like it
> breaks the goal of fixing log retention (at least for mirrors) as well as
> the goal of improving searching by timestamp (at least for mirrors).
>
> 5. You never actually specified the granularity (or format, but I assume
> unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This
> definitely needs to eventually make it into the protocol docs.
>
> 6. Re: the rejected alternative. Are there any other options in changing
> the config format that might make it a bit lighter weight? For example, do
> we need a full int64? Could we do something relative instead that wouldn't
> require as many bytes? Without (5) being specified, it's actually difficult
> to evaluate some of these options.
>
> 7. Any plan to expose this in the client APIs? This is related to (4). If
> they are not exposed anywhere in the API as being associated with messages,
> then we can reasonably treat them as an internal implementation detail and
> be very clear about what looking up an offset for a timestamp means. If
> they are exposed, then they're more like message metadata that applications
> are probably going to want preserved, and thus will want the broker to
> respect the timestamp. For example, if I saw that consumers could get the
> timestamp, I'd want to be able to assign it at the producer so that even if
> I have significant batching I s

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-03 Thread Jiangjie Qin
Hey Ewen,

Thanks for the comments and they are really good questions. Please inline
replies.

On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava 
wrote:

> A few questions:
>
> 1. If we update the producers to only support V1, doesn't that mean people
> get stuck on the current version of the producer until they can be sure all
> their consumers have been upgraded? Is that going to be a problem for
> anyone, and does it potentially keep important fixes/enhancements (e.g. the
> upcoming client-side network request timeouts) because they have to wait
> for all the consumers to upgrade first?
>
This is a good point. I thought about this before, I and my initial
thinking is that we might need to add a config on producer to specify which
version you want to use to produce. But this seems to be a pretty ad-hoc
approach and I don't really like it. We are working on some general
protocol version control mechanism proposal and will have a separate KIP
for that.

>
> 2. Why minute granularity specifically? That's seems fairly specific to
> LI's example workload (which, by the way, is a very useful reference point
> to include in the KIP, thanks for that). If someone has the memory and can
> support it, why not let them go all the way to per-record timestamps (by
> making the interval configurable and they can set it really small/to 0)? It
> seems to me that some people might be willing to pay that cost for the
> application-side simplicity if they want to consume from specific
> timestamps. With the proposal as is, seeking to a timestamp still requires
> client-side logic to filter out messages that might have earlier timestamps
> due to the granularity chosen. The obvious tradeoff here is yet another
> config option -- I'm loath to add yet more configs, although this is one
> that we can choose a reasonable default for (like 1 minute) and people can
> easily change with no impact if they need to adjust it.
>
The searching granularity will be actually millisecond. The index
granularity only determines how close you will be to the actually message
with the timestamp you are looking for. For example, if you are looking for
a message with timestamp 10:00:15, a minute granularity will give you the
offset at 10:00:00, and it needs to go through the records from 10:00:00 to
10:00:15 to find the message. But with a second level granularity, it might
only need to go through the message produced in one second. So minute level
granularity index will take longer for search, but the precision will be
the same as second level index. That said, I am not objecting to adding the
granularity configuration but I am not sure how useful it would be to have
second level index because I think typically a consumer will be
long-running and only search for the timestamp at startup.
I will update the KIP page to clarify the precision.

>

3. Why not respect the timestamp passed in as long as it's not some
> sentinel value that indicates the broker should fill it in? When you do
> know they will be ordered, as they should be with mirror maker (which is
> specifically mentioned), this seems really useful. (More about this in
> questions below...)
>
Like what you mentioned in (4), having a log without monotonically
increasing timestamp is weird. To me it is even worse than having an empty
timestamp field in the inner message that will not be used except for log
compacted topic. I think the only way to solve this issue is to add another
CreateTime to the message. So far I am not sure how useful it is though
because arguably people can always put this timestamp in side the payload.
So I think this timestamp is more for server side usage instead of
application / client side usage.

>
> 4. I think one obvious answer to (3) is that accepting client's timestamps
> could break seeking-by-timestamp since they may not be properly ordered.
> However, I think this can break anyway under normal operations -- any
> imperfections in clock synchronization could result in older timestamps
> being applied to new messages on a new leader broker compared messages
> stored previously by the old leader. I think it's probably too common to
> just think that NTP will take care of it and then run into weird bugs
> because NTP isn't always perfect, sometimes does some unexpected things,
> and, of course, does what you tell it to and so is subject to user error.

This is a good point. Yes, NTP only guarantee limited synchronization
precision (several microseconds if you have low stratum and appropriate
PPS). My experience is that it actually is good and stable enough even for
some mission critical system such a core banking. Maybe this is more of an
implementation detail. The simple solution is that when leader append
messages to the log, it always take the max(lastAppendedTimestamp,
currentTimeMillis). Arguably we can play the same trick even if we let the
producer to fill in the timestamp. But that means the timestamp producer
set may ore may not be honored, which

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-04 Thread Ewen Cheslack-Postava
On Thu, Sep 3, 2015 at 11:45 PM, Jiangjie Qin 
wrote:

> Hey Ewen,
>
> Thanks for the comments and they are really good questions. Please inline
> replies.
>
> On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava 
> wrote:
>
> > A few questions:
> >
> > 1. If we update the producers to only support V1, doesn't that mean
> people
> > get stuck on the current version of the producer until they can be sure
> all
> > their consumers have been upgraded? Is that going to be a problem for
> > anyone, and does it potentially keep important fixes/enhancements (e.g.
> the
> > upcoming client-side network request timeouts) because they have to wait
> > for all the consumers to upgrade first?
> >
> This is a good point. I thought about this before, I and my initial
> thinking is that we might need to add a config on producer to specify which
> version you want to use to produce. But this seems to be a pretty ad-hoc
> approach and I don't really like it. We are working on some general
> protocol version control mechanism proposal and will have a separate KIP
> for that.
>

The configs are annoying, but also can provide a way for us to guarantee a
smooth upgrade for users that use the defaults and upgrade their entire
infrastructure one version at a time:

v0.8.3:
no support for v1

v0.9.0:
broker: accept.format=v0 (v1 is rejected)
producer: produce.format=v0
consumer: include support for v0,v1

v0.9.1
broker: accept.format=v1
producer: produce.format=v1
consumer: v0,v1



> >
> > 2. Why minute granularity specifically? That's seems fairly specific to
> > LI's example workload (which, by the way, is a very useful reference
> point
> > to include in the KIP, thanks for that). If someone has the memory and
> can
> > support it, why not let them go all the way to per-record timestamps (by
> > making the interval configurable and they can set it really small/to 0)?
> It
> > seems to me that some people might be willing to pay that cost for the
> > application-side simplicity if they want to consume from specific
> > timestamps. With the proposal as is, seeking to a timestamp still
> requires
> > client-side logic to filter out messages that might have earlier
> timestamps
> > due to the granularity chosen. The obvious tradeoff here is yet another
> > config option -- I'm loath to add yet more configs, although this is one
> > that we can choose a reasonable default for (like 1 minute) and people
> can
> > easily change with no impact if they need to adjust it.
> >
> The searching granularity will be actually millisecond. The index
> granularity only determines how close you will be to the actually message
> with the timestamp you are looking for. For example, if you are looking for
> a message with timestamp 10:00:15, a minute granularity will give you the
> offset at 10:00:00, and it needs to go through the records from 10:00:00 to
> 10:00:15 to find the message. But with a second level granularity, it might
> only need to go through the message produced in one second. So minute level
> granularity index will take longer for search, but the precision will be
> the same as second level index. That said, I am not objecting to adding the
> granularity configuration but I am not sure how useful it would be to have
> second level index because I think typically a consumer will be
> long-running and only search for the timestamp at startup.
> I will update the KIP page to clarify the precision.
>
>
Ok, this makes sense.


> >
>
> 3. Why not respect the timestamp passed in as long as it's not some
> > sentinel value that indicates the broker should fill it in? When you do
> > know they will be ordered, as they should be with mirror maker (which is
> > specifically mentioned), this seems really useful. (More about this in
> > questions below...)
> >
> Like what you mentioned in (4), having a log without monotonically
> increasing timestamp is weird. To me it is even worse than having an empty
> timestamp field in the inner message that will not be used except for log
> compacted topic. I think the only way to solve this issue is to add another
> CreateTime to the message. So far I am not sure how useful it is though
> because arguably people can always put this timestamp in side the payload.
> So I think this timestamp is more for server side usage instead of
> application / client side usage.
>

I think setting it broker-side is fine, I just want to make sure user
expectations are clear. The existing search-by-timestamp has obvious
limitations. This proposal makes it much more accurate, so it needs to be
clear who is responsible for assigning the timestamps and what they mean
for an application. Applications that care about when the message was
actually created will still need to do some additional work.


>
> >
> > 4. I think one obvious answer to (3) is that accepting client's
> timestamps
> > could break seeking-by-timestamp since they may not be properly ordered.
> > However, I think this can break anyway under normal operat

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-06 Thread Jay Kreps
The magic byte is used to version message format so we'll need to make sure
that check is in place--I actually don't see it in the current consumer
code which I think is a bug we should fix for the next release (filed
KAFKA-2523). The purpose of that field is so there is a clear check on the
format rather than the scrambled scenarios Becket describes.

Also, Becket, I don't think just fixing the java client is sufficient as
that would break other clients--i.e. if anyone writes a v1 messages, even
by accident, any non-v1-capable consumer will break. I think we probably
need a way to have the server ensure a particular message format either at
read or write time.

-Jay

On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin 
wrote:

> Hi Guozhang,
>
> I checked the code again. Actually CRC check probably won't fail. The newly
> added timestamp field might be treated as keyLength instead, so we are
> likely to receive an IllegalArgumentException when try to read the key.
> I'll update the KIP.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin  wrote:
>
> > Hi, Guozhang,
> >
> > Thanks for reading the KIP. By "old consumer", I meant the
> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. If
> we
> > fix the ZookeeperConsumerConnector then it will throw exception
> complaining
> > about the unsupported version when it sees message format V1. What I was
> > trying to say is that if we have some ZookeeperConsumerConnector running
> > without the fix, the consumer will complain about CRC mismatch instead of
> > unsupported version.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang 
> wrote:
> >
> >> Thanks for the write-up Jiangjie.
> >>
> >> One comment about migration plan: "For old consumers, if they see the
> new
> >> protocol the CRC check will fail"..
> >>
> >> Do you mean this bug in the old consumer cannot be fixed in a
> >> backward-compatible way?
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin  >
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > We just created KIP-31 to propose a message format change in Kafka.
> >> >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> >> >
> >> > As a summary, the motivations are:
> >> > 1. Avoid server side message re-compression
> >> > 2. Honor time-based log roll and retention
> >> > 3. Enable offset search by timestamp at a finer granularity.
> >> >
> >> > Feedback and comments are welcome!
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-06 Thread Jay Kreps
One more comment, I think there are really three proposals here:
1. Get a mechanism and policy in place for record format upgrade (we
haven't done this so we don't really have the infra). This is kind of
implicit. I suspect we'll need to do this multiple times in the future so
we should make it easy.
2. Add a timestamp to messages.
3. Move to relative offsets

For sanity it might make sense to discuss these individually.

I think the relative offset proposal is pretty straight-forward. It
probably should have been done that way to begin with. I think you should
get near-universal support on that one. Saving the re-compression on the
server is a big win. I really wish we'd thought of that at the time.

The timestamp problems we have are definitely annoying, and I agree that
time is really a first class thign. But adding time has a ton of problems
that need to be fully worked out before we pull the trigger.

First, I like the implementation plan you have for the time index--I think
you are saying that would retain the same format as the existing
OffsetIndex, although it would require some refactoring. You are correct
that this should be a separate index file--this will allow the index to be
less frequent (smaller) and also let it page out if it isn't used.

Now the bad bits about time!

1. Clock time isn't sequential.
The whole point of NTP is to sync the clock. That means changing the time
forwards and backward I think. Also users can change the time any time they
want! Also when the master fails it moves to a different machine, maybe
it's clock is sync'd, maybe it's not. If I mirror-maker two partitions into
one then surely there is skew, possibly hours or days of skew (i.e. imagine
cross-dc mirror maker where the network isn't available for a bit of time
and then catches up). (Also not sure how having the leader do
max(old_leader_time, current_time) works if we accept client times in the
mm case?)

2. Nobody cares what time it is on the server.
Consider cases where data is being copied from a database or from log
files. In steady-state the server time is very close to the client time if
their clocks are sync'd (see 1) but there will be times of large divergence
when the copying process is stopped or falls behind. When this occurs it is
clear that the time the data arrived on the server is irrelevant, it is the
source timestamp that matters. This is the problem you are trying to fix by
retaining the mm timestamp but really the client should always set the time
with the use of server-side time as a fallback. It would be worth talking
to the Samza folks and reading through this blog post (
http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html)
on this subject since we went through similar learnings on the stream
processing side.

I think the implication of these two is that we need a proposal that
handles potentially very out-of-order timestamps in some kind of sanish way
(buggy clients will set something totally wrong as the time).

-Jay

On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps  wrote:

> The magic byte is used to version message format so we'll need to make
> sure that check is in place--I actually don't see it in the current
> consumer code which I think is a bug we should fix for the next release
> (filed KAFKA-2523). The purpose of that field is so there is a clear check
> on the format rather than the scrambled scenarios Becket describes.
>
> Also, Becket, I don't think just fixing the java client is sufficient as
> that would break other clients--i.e. if anyone writes a v1 messages, even
> by accident, any non-v1-capable consumer will break. I think we probably
> need a way to have the server ensure a particular message format either at
> read or write time.
>
> -Jay
>
> On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin 
> wrote:
>
>> Hi Guozhang,
>>
>> I checked the code again. Actually CRC check probably won't fail. The
>> newly
>> added timestamp field might be treated as keyLength instead, so we are
>> likely to receive an IllegalArgumentException when try to read the key.
>> I'll update the KIP.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin  wrote:
>>
>> > Hi, Guozhang,
>> >
>> > Thanks for reading the KIP. By "old consumer", I meant the
>> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed.
>> If we
>> > fix the ZookeeperConsumerConnector then it will throw exception
>> complaining
>> > about the unsupported version when it sees message format V1. What I was
>> > trying to say is that if we have some ZookeeperConsumerConnector running
>> > without the fix, the consumer will complain about CRC mismatch instead
>> of
>> > unsupported version.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang 
>> wrote:
>> >
>> >> Thanks for the write-up Jiangjie.
>> >>
>> >> One comment about migration plan: "For old consumers, if they see the
>> new
>> >> protocol the CRC 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-06 Thread Ewen Cheslack-Postava
On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps  wrote:

>
> 2. Nobody cares what time it is on the server.
>

This is a good way of summarizing the issue I was trying to get at, from an
app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is
reasonably handled by a server-side timestamp. I really just care that a
message is there long enough that I have a chance to process it. #3
(searching by timestamp) only seems useful if we can guarantee the
server-side timestamp is close enough to the original client-side
timestamp, and any mirror maker step seems to break that (even ignoring any
issues with broker availability).

I'm also wondering whether optimizing for search-by-timestamp on the broker
is really something we want to do given that messages aren't really
guaranteed to be ordered by application-level timestamps on the broker. Is
part of the need for this just due to the current consumer APIs being
difficult to work with? For example, could you implement this pretty easily
client side just the way you would broker-side? I'd imagine a couple of
random seeks + reads during very rare occasions (i.e. when the app starts
up) wouldn't be a problem performance-wise. Or is it also that you need the
broker to enforce things like monotonically increasing timestamps since you
can't do the query properly and efficiently without that guarantee, and
therefore what applications are actually looking for *is* broker-side
timestamps?

-Ewen



> Consider cases where data is being copied from a database or from log
> files. In steady-state the server time is very close to the client time if
> their clocks are sync'd (see 1) but there will be times of large divergence
> when the copying process is stopped or falls behind. When this occurs it is
> clear that the time the data arrived on the server is irrelevant, it is the
> source timestamp that matters. This is the problem you are trying to fix by
> retaining the mm timestamp but really the client should always set the time
> with the use of server-side time as a fallback. It would be worth talking
> to the Samza folks and reading through this blog post (
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> )
> on this subject since we went through similar learnings on the stream
> processing side.
>
> I think the implication of these two is that we need a proposal that
> handles potentially very out-of-order timestamps in some kind of sanish way
> (buggy clients will set something totally wrong as the time).
>
> -Jay
>
> On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps  wrote:
>
> > The magic byte is used to version message format so we'll need to make
> > sure that check is in place--I actually don't see it in the current
> > consumer code which I think is a bug we should fix for the next release
> > (filed KAFKA-2523). The purpose of that field is so there is a clear
> check
> > on the format rather than the scrambled scenarios Becket describes.
> >
> > Also, Becket, I don't think just fixing the java client is sufficient as
> > that would break other clients--i.e. if anyone writes a v1 messages, even
> > by accident, any non-v1-capable consumer will break. I think we probably
> > need a way to have the server ensure a particular message format either
> at
> > read or write time.
> >
> > -Jay
> >
> > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin 
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> I checked the code again. Actually CRC check probably won't fail. The
> >> newly
> >> added timestamp field might be treated as keyLength instead, so we are
> >> likely to receive an IllegalArgumentException when try to read the key.
> >> I'll update the KIP.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin 
> wrote:
> >>
> >> > Hi, Guozhang,
> >> >
> >> > Thanks for reading the KIP. By "old consumer", I meant the
> >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed.
> >> If we
> >> > fix the ZookeeperConsumerConnector then it will throw exception
> >> complaining
> >> > about the unsupported version when it sees message format V1. What I
> was
> >> > trying to say is that if we have some ZookeeperConsumerConnector
> running
> >> > without the fix, the consumer will complain about CRC mismatch instead
> >> of
> >> > unsupported version.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang 
> >> wrote:
> >> >
> >> >> Thanks for the write-up Jiangjie.
> >> >>
> >> >> One comment about migration plan: "For old consumers, if they see the
> >> new
> >> >> protocol the CRC check will fail"..
> >> >>
> >> >> Do you mean this bug in the old consumer cannot be fixed in a
> >> >> backward-compatible way?
> >> >>
> >> >> Guozhang
> >> >>
> >> >>
> >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
>  >> >
> >> >> wrote:
> >> >>
> >> >> > Hi,
> >> >> >
> >> >> > We just created KIP-31 to propose a message format change in

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-06 Thread Todd Palino
So, with regards to why you want to search by timestamp, the biggest
problem I've seen is with consumers who want to reset their timestamps to a
specific point, whether it is to replay a certain amount of messages, or to
rewind to before some problem state existed. This happens more often than
anyone would like.

To handle this now we need to constantly export the broker's offset for
every partition to a time-series database and then use external processes
to query this. I know we're not the only ones doing this. The way the
broker handles requests for offsets by timestamp is a little obtuse
(explain it to anyone without intimate knowledge of the internal workings
of the broker - every time I do I see this). In addition, as Becket pointed
out, it causes problems specifically with retention of messages by time
when you move partitions around.

I'm deliberately avoiding the discussion of what timestamp to use. I can
see the argument either way, though I tend to lean towards the idea that
the broker timestamp is the only viable source of truth in this situation.

-Todd


On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava 
wrote:

> On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps  wrote:
>
> >
> > 2. Nobody cares what time it is on the server.
> >
>
> This is a good way of summarizing the issue I was trying to get at, from an
> app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is
> reasonably handled by a server-side timestamp. I really just care that a
> message is there long enough that I have a chance to process it. #3
> (searching by timestamp) only seems useful if we can guarantee the
> server-side timestamp is close enough to the original client-side
> timestamp, and any mirror maker step seems to break that (even ignoring any
> issues with broker availability).
>
> I'm also wondering whether optimizing for search-by-timestamp on the broker
> is really something we want to do given that messages aren't really
> guaranteed to be ordered by application-level timestamps on the broker. Is
> part of the need for this just due to the current consumer APIs being
> difficult to work with? For example, could you implement this pretty easily
> client side just the way you would broker-side? I'd imagine a couple of
> random seeks + reads during very rare occasions (i.e. when the app starts
> up) wouldn't be a problem performance-wise. Or is it also that you need the
> broker to enforce things like monotonically increasing timestamps since you
> can't do the query properly and efficiently without that guarantee, and
> therefore what applications are actually looking for *is* broker-side
> timestamps?
>
> -Ewen
>
>
>
> > Consider cases where data is being copied from a database or from log
> > files. In steady-state the server time is very close to the client time
> if
> > their clocks are sync'd (see 1) but there will be times of large
> divergence
> > when the copying process is stopped or falls behind. When this occurs it
> is
> > clear that the time the data arrived on the server is irrelevant, it is
> the
> > source timestamp that matters. This is the problem you are trying to fix
> by
> > retaining the mm timestamp but really the client should always set the
> time
> > with the use of server-side time as a fallback. It would be worth talking
> > to the Samza folks and reading through this blog post (
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > )
> > on this subject since we went through similar learnings on the stream
> > processing side.
> >
> > I think the implication of these two is that we need a proposal that
> > handles potentially very out-of-order timestamps in some kind of sanish
> way
> > (buggy clients will set something totally wrong as the time).
> >
> > -Jay
> >
> > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps  wrote:
> >
> > > The magic byte is used to version message format so we'll need to make
> > > sure that check is in place--I actually don't see it in the current
> > > consumer code which I think is a bug we should fix for the next release
> > > (filed KAFKA-2523). The purpose of that field is so there is a clear
> > check
> > > on the format rather than the scrambled scenarios Becket describes.
> > >
> > > Also, Becket, I don't think just fixing the java client is sufficient
> as
> > > that would break other clients--i.e. if anyone writes a v1 messages,
> even
> > > by accident, any non-v1-capable consumer will break. I think we
> probably
> > > need a way to have the server ensure a particular message format either
> > at
> > > read or write time.
> > >
> > > -Jay
> > >
> > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin  >
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> I checked the code again. Actually CRC check probably won't fail. The
> > >> newly
> > >> added timestamp field might be treated as keyLength instead, so we are
> > >> likely to receive an IllegalArgumentException when try to read the
> key.
> > >> I'll update t

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-07 Thread Jiangjie Qin
Jay,

Thanks for the comments. Yes, there are actually three proposals as you
pointed out.

We will have a separate proposal for (1) - version control mechanism. We
actually thought about whether we want to separate 2 and 3 internally
before creating the KIP. The reason we put 2 and 3 together is it will
saves us another cross board wire protocol change. Like you said, we have
to migrate all the clients in all languages. To some extent, the effort to
spend on upgrading the clients can be even bigger than implementing the new
feature itself. So there are some attractions if we can do 2 and 3 together
instead of separately. Maybe after (1) is done it will be easier to do
protocol migration. But if we are able to come to an agreement on the
timestamp solution, I would prefer to have it together with relative offset
in the interest of avoiding another wire protocol change (the process to
migrate to relative offset is exactly the same as migrate to message with
timestamp).

In terms of timestamp. I completely agree that having client timestamp is
more useful if we can make sure the timestamp is good. But in reality that
can be a really big *IF*. I think the problem is exactly as Ewen mentioned,
if we let the client to set the timestamp, it would be very hard for the
broker to utilize it. If broker apply retention policy based on the client
timestamp. One misbehave producer can potentially completely mess up the
retention policy on the broker. Although people don't care about server
side timestamp. People do care a lot when timestamp breaks. Searching by
timestamp is a really important use case even though it is not used as
often as searching by offset. It has significant direct impact on RTO when
there is a cross cluster failover as Todd mentioned.

The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
guarantee monotonic increase of the timestamp. Many commercial system
actually do something similar to this to solve the time skew. About
changing the time, I am not sure if people use NTP like using a watch to
just set it forward/backward by an hour or so. The time adjustment I used
to do is typically to adjust something like a minute  / week. So for each
second, there might be a few mircoseconds slower/faster but should not
break the clock completely to make sure all the time-based transactions are
not affected. The one minute change will be done within a week but not
instantly.

Personally, I think having client side timestamp will be useful if we don't
need to put the broker and data integrity under risk. If we have to choose
from one of them but not both. I would prefer server side timestamp because
for client side timestamp there is always a plan B which is putting the
timestamp into payload.

Another reason I am reluctant to use the client side timestamp is that it
is always dangerous to mix the control plane with data plane. IP did this
and it has caused so many different breaches so people are migrating to
something like MPLS. An example in Kafka is that any client can construct a
LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest (you
name it) and send it to the broker to mess up the entire cluster, also as
we already noticed a busy cluster can respond quite slow to controller
messages. So it would really be nice if we can avoid giving the power to
clients to control the log retention.

Thanks,

Jiangjie (Becket) Qin


On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino  wrote:

> So, with regards to why you want to search by timestamp, the biggest
> problem I've seen is with consumers who want to reset their timestamps to a
> specific point, whether it is to replay a certain amount of messages, or to
> rewind to before some problem state existed. This happens more often than
> anyone would like.
>
> To handle this now we need to constantly export the broker's offset for
> every partition to a time-series database and then use external processes
> to query this. I know we're not the only ones doing this. The way the
> broker handles requests for offsets by timestamp is a little obtuse
> (explain it to anyone without intimate knowledge of the internal workings
> of the broker - every time I do I see this). In addition, as Becket pointed
> out, it causes problems specifically with retention of messages by time
> when you move partitions around.
>
> I'm deliberately avoiding the discussion of what timestamp to use. I can
> see the argument either way, though I tend to lean towards the idea that
> the broker timestamp is the only viable source of truth in this situation.
>
> -Todd
>
>
> On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava 
> wrote:
>
> > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps  wrote:
> >
> > >
> > > 2. Nobody cares what time it is on the server.
> > >
> >
> > This is a good way of summarizing the issue I was trying to get at, from
> an
> > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention)
> is
> > reasonably handled by a server-side timestamp

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-08 Thread Jay Kreps
Hey Todd,

Yeah, totally agree the use case is important. I think there are
potentially also other uses that having access by time opens up too.

-Jay

On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino  wrote:

> So, with regards to why you want to search by timestamp, the biggest
> problem I've seen is with consumers who want to reset their timestamps to a
> specific point, whether it is to replay a certain amount of messages, or to
> rewind to before some problem state existed. This happens more often than
> anyone would like.
>
> To handle this now we need to constantly export the broker's offset for
> every partition to a time-series database and then use external processes
> to query this. I know we're not the only ones doing this. The way the
> broker handles requests for offsets by timestamp is a little obtuse
> (explain it to anyone without intimate knowledge of the internal workings
> of the broker - every time I do I see this). In addition, as Becket pointed
> out, it causes problems specifically with retention of messages by time
> when you move partitions around.
>
> I'm deliberately avoiding the discussion of what timestamp to use. I can
> see the argument either way, though I tend to lean towards the idea that
> the broker timestamp is the only viable source of truth in this situation.
>
> -Todd
>
>
> On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava 
> wrote:
>
> > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps  wrote:
> >
> > >
> > > 2. Nobody cares what time it is on the server.
> > >
> >
> > This is a good way of summarizing the issue I was trying to get at, from
> an
> > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention)
> is
> > reasonably handled by a server-side timestamp. I really just care that a
> > message is there long enough that I have a chance to process it. #3
> > (searching by timestamp) only seems useful if we can guarantee the
> > server-side timestamp is close enough to the original client-side
> > timestamp, and any mirror maker step seems to break that (even ignoring
> any
> > issues with broker availability).
> >
> > I'm also wondering whether optimizing for search-by-timestamp on the
> broker
> > is really something we want to do given that messages aren't really
> > guaranteed to be ordered by application-level timestamps on the broker.
> Is
> > part of the need for this just due to the current consumer APIs being
> > difficult to work with? For example, could you implement this pretty
> easily
> > client side just the way you would broker-side? I'd imagine a couple of
> > random seeks + reads during very rare occasions (i.e. when the app starts
> > up) wouldn't be a problem performance-wise. Or is it also that you need
> the
> > broker to enforce things like monotonically increasing timestamps since
> you
> > can't do the query properly and efficiently without that guarantee, and
> > therefore what applications are actually looking for *is* broker-side
> > timestamps?
> >
> > -Ewen
> >
> >
> >
> > > Consider cases where data is being copied from a database or from log
> > > files. In steady-state the server time is very close to the client time
> > if
> > > their clocks are sync'd (see 1) but there will be times of large
> > divergence
> > > when the copying process is stopped or falls behind. When this occurs
> it
> > is
> > > clear that the time the data arrived on the server is irrelevant, it is
> > the
> > > source timestamp that matters. This is the problem you are trying to
> fix
> > by
> > > retaining the mm timestamp but really the client should always set the
> > time
> > > with the use of server-side time as a fallback. It would be worth
> talking
> > > to the Samza folks and reading through this blog post (
> > >
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > > )
> > > on this subject since we went through similar learnings on the stream
> > > processing side.
> > >
> > > I think the implication of these two is that we need a proposal that
> > > handles potentially very out-of-order timestamps in some kind of sanish
> > way
> > > (buggy clients will set something totally wrong as the time).
> > >
> > > -Jay
> > >
> > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps  wrote:
> > >
> > > > The magic byte is used to version message format so we'll need to
> make
> > > > sure that check is in place--I actually don't see it in the current
> > > > consumer code which I think is a bug we should fix for the next
> release
> > > > (filed KAFKA-2523). The purpose of that field is so there is a clear
> > > check
> > > > on the format rather than the scrambled scenarios Becket describes.
> > > >
> > > > Also, Becket, I don't think just fixing the java client is sufficient
> > as
> > > > that would break other clients--i.e. if anyone writes a v1 messages,
> > even
> > > > by accident, any non-v1-capable consumer will break. I think we
> > probably
> > > > need a way to have the server ensure a particular message format
> e

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-08 Thread Jay Kreps
Hey Beckett,

I was proposing splitting up the KIP just for simplicity of discussion. You
can still implement them in one patch. I think otherwise it will be hard to
discuss/vote on them since if you like the offset proposal but not the time
proposal what do you do?

Introducing a second notion of time into Kafka is a pretty massive
philosophical change so it kind of warrants it's own KIP I think it isn't
just "Change message format".

WRT time I think one thing to clarify in the proposal is how MM will have
access to set the timestamp? Presumably this will be a new field in
ProducerRecord, right? If so then any user can set the timestamp, right?
I'm not sure you answered the questions around how this will work for MM
since when MM retains timestamps from multiple partitions they will then be
out of order and in the past (so the max(lastAppendedTimestamp,
currentTimeMillis) override you proposed will not work, right?). If we
don't do this then when you set up mirroring the data will all be new and
you have the same retention problem you described. Maybe I missed
something...?

My main motivation is that given that both Samza and Kafka streams are
doing work that implies a mandatory client-defined notion of time, I really
think introducing a different mandatory notion of time in Kafka is going to
be quite odd. We should think hard about how client-defined time could
work. I'm not sure if it can, but I'm also not sure that it can't. Having
both will be odd. Did you chat about this with Yi/Kartik on the Samza side?

When you are saying it won't work you are assuming some particular
implementation? Maybe that the index is a monotonically increasing set of
pointers to the least record with a timestamp larger than the index time?
In other words a search for time X gives the largest offset at which all
records are <= X?

For retention, I agree with the problem you point out, but I think what you
are saying in that case is that you want a size limit too. If you use
system time you actually hit the same problem: say you do a full dump of a
DB table with a setting of 7 days retention, your retention will actually
not get enforced for the first 7 days because the data is "new to Kafka".

-Jay


On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin 
wrote:

> Jay,
>
> Thanks for the comments. Yes, there are actually three proposals as you
> pointed out.
>
> We will have a separate proposal for (1) - version control mechanism. We
> actually thought about whether we want to separate 2 and 3 internally
> before creating the KIP. The reason we put 2 and 3 together is it will
> saves us another cross board wire protocol change. Like you said, we have
> to migrate all the clients in all languages. To some extent, the effort to
> spend on upgrading the clients can be even bigger than implementing the new
> feature itself. So there are some attractions if we can do 2 and 3 together
> instead of separately. Maybe after (1) is done it will be easier to do
> protocol migration. But if we are able to come to an agreement on the
> timestamp solution, I would prefer to have it together with relative offset
> in the interest of avoiding another wire protocol change (the process to
> migrate to relative offset is exactly the same as migrate to message with
> timestamp).
>
> In terms of timestamp. I completely agree that having client timestamp is
> more useful if we can make sure the timestamp is good. But in reality that
> can be a really big *IF*. I think the problem is exactly as Ewen mentioned,
> if we let the client to set the timestamp, it would be very hard for the
> broker to utilize it. If broker apply retention policy based on the client
> timestamp. One misbehave producer can potentially completely mess up the
> retention policy on the broker. Although people don't care about server
> side timestamp. People do care a lot when timestamp breaks. Searching by
> timestamp is a really important use case even though it is not used as
> often as searching by offset. It has significant direct impact on RTO when
> there is a cross cluster failover as Todd mentioned.
>
> The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
> guarantee monotonic increase of the timestamp. Many commercial system
> actually do something similar to this to solve the time skew. About
> changing the time, I am not sure if people use NTP like using a watch to
> just set it forward/backward by an hour or so. The time adjustment I used
> to do is typically to adjust something like a minute  / week. So for each
> second, there might be a few mircoseconds slower/faster but should not
> break the clock completely to make sure all the time-based transactions are
> not affected. The one minute change will be done within a week but not
> instantly.
>
> Personally, I think having client side timestamp will be useful if we don't
> need to put the broker and data integrity under risk. If we have to choose
> from one of them but not both. I would prefer server 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-08 Thread Neha Narkhede
Becket,

Nice write-up. Few thoughts -

I'd split up the discussion for simplicity. Note that you can always group
several of these in one patch to reduce the protocol changes people have to
deal with.This is just a suggestion, but I think the following split might
make it easier to tackle the changes being proposed -

   - Relative offsets
   - Introducing the concept of time
   - Time-based indexing (separate the usage of the timestamp field from
   how/whether we want to include a timestamp in the message)

I'm a +1 on relative offsets, we should've done it back when we introduced
it. Other than reducing the CPU overhead, this will also reduce the garbage
collection overhead on the brokers.

On the timestamp field, I generally agree that we should add a timestamp to
a Kafka message but I'm not quite sold on how this KIP suggests the
timestamp be set. Will avoid repeating the downsides of a broker side
timestamp mentioned previously in this thread. I think the topic of
including a timestamp in a Kafka message requires a lot more thought and
details than what's in this KIP. I'd suggest we make it a separate KIP that
includes a list of all the different use cases for the timestamp (beyond
log retention) including stream processing and discuss tradeoffs of
including client and broker side timestamps.

Agree with the benefit of time-based indexing, but haven't had a chance to
dive into the design details yet.

Thanks,
Neha

On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:

> Hey Beckett,
>
> I was proposing splitting up the KIP just for simplicity of discussion. You
> can still implement them in one patch. I think otherwise it will be hard to
> discuss/vote on them since if you like the offset proposal but not the time
> proposal what do you do?
>
> Introducing a second notion of time into Kafka is a pretty massive
> philosophical change so it kind of warrants it's own KIP I think it isn't
> just "Change message format".
>
> WRT time I think one thing to clarify in the proposal is how MM will have
> access to set the timestamp? Presumably this will be a new field in
> ProducerRecord, right? If so then any user can set the timestamp, right?
> I'm not sure you answered the questions around how this will work for MM
> since when MM retains timestamps from multiple partitions they will then be
> out of order and in the past (so the max(lastAppendedTimestamp,
> currentTimeMillis) override you proposed will not work, right?). If we
> don't do this then when you set up mirroring the data will all be new and
> you have the same retention problem you described. Maybe I missed
> something...?
>
> My main motivation is that given that both Samza and Kafka streams are
> doing work that implies a mandatory client-defined notion of time, I really
> think introducing a different mandatory notion of time in Kafka is going to
> be quite odd. We should think hard about how client-defined time could
> work. I'm not sure if it can, but I'm also not sure that it can't. Having
> both will be odd. Did you chat about this with Yi/Kartik on the Samza side?
>
> When you are saying it won't work you are assuming some particular
> implementation? Maybe that the index is a monotonically increasing set of
> pointers to the least record with a timestamp larger than the index time?
> In other words a search for time X gives the largest offset at which all
> records are <= X?
>
> For retention, I agree with the problem you point out, but I think what you
> are saying in that case is that you want a size limit too. If you use
> system time you actually hit the same problem: say you do a full dump of a
> DB table with a setting of 7 days retention, your retention will actually
> not get enforced for the first 7 days because the data is "new to Kafka".
>
> -Jay
>
>
> On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin 
> wrote:
>
> > Jay,
> >
> > Thanks for the comments. Yes, there are actually three proposals as you
> > pointed out.
> >
> > We will have a separate proposal for (1) - version control mechanism. We
> > actually thought about whether we want to separate 2 and 3 internally
> > before creating the KIP. The reason we put 2 and 3 together is it will
> > saves us another cross board wire protocol change. Like you said, we have
> > to migrate all the clients in all languages. To some extent, the effort
> to
> > spend on upgrading the clients can be even bigger than implementing the
> new
> > feature itself. So there are some attractions if we can do 2 and 3
> together
> > instead of separately. Maybe after (1) is done it will be easier to do
> > protocol migration. But if we are able to come to an agreement on the
> > timestamp solution, I would prefer to have it together with relative
> offset
> > in the interest of avoiding another wire protocol change (the process to
> > migrate to relative offset is exactly the same as migrate to message with
> > timestamp).
> >
> > In terms of timestamp. I completely agree that having client timestamp

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Jiangjie Qin
Neha and Jay,

Thanks a lot for the feedback. Good point about splitting the discussion. I
have split the proposal to three KIPs and it does make each discussion more
clear:
KIP-31 - Message format change (Use relative offset)
KIP-32 - Add CreateTime and LogAppendTime to Kafka message
KIP-33 - Build a time-based log index

KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31
and KIP-32 first for now. I will create a separate discussion thread for
KIP-32 and reply the concerns you raised regarding the timestamp.

So far it looks there is no objection to KIP-31. Since I removed a few part
from previous KIP and only left the relative offset proposal, it would be
great if people can take another look to see if there is any concerns.

Thanks,

Jiangjie (Becket) Qin


On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede  wrote:

> Becket,
>
> Nice write-up. Few thoughts -
>
> I'd split up the discussion for simplicity. Note that you can always group
> several of these in one patch to reduce the protocol changes people have to
> deal with.This is just a suggestion, but I think the following split might
> make it easier to tackle the changes being proposed -
>
>- Relative offsets
>- Introducing the concept of time
>- Time-based indexing (separate the usage of the timestamp field from
>how/whether we want to include a timestamp in the message)
>
> I'm a +1 on relative offsets, we should've done it back when we introduced
> it. Other than reducing the CPU overhead, this will also reduce the garbage
> collection overhead on the brokers.
>
> On the timestamp field, I generally agree that we should add a timestamp to
> a Kafka message but I'm not quite sold on how this KIP suggests the
> timestamp be set. Will avoid repeating the downsides of a broker side
> timestamp mentioned previously in this thread. I think the topic of
> including a timestamp in a Kafka message requires a lot more thought and
> details than what's in this KIP. I'd suggest we make it a separate KIP that
> includes a list of all the different use cases for the timestamp (beyond
> log retention) including stream processing and discuss tradeoffs of
> including client and broker side timestamps.
>
> Agree with the benefit of time-based indexing, but haven't had a chance to
> dive into the design details yet.
>
> Thanks,
> Neha
>
> On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:
>
> > Hey Beckett,
> >
> > I was proposing splitting up the KIP just for simplicity of discussion.
> You
> > can still implement them in one patch. I think otherwise it will be hard
> to
> > discuss/vote on them since if you like the offset proposal but not the
> time
> > proposal what do you do?
> >
> > Introducing a second notion of time into Kafka is a pretty massive
> > philosophical change so it kind of warrants it's own KIP I think it isn't
> > just "Change message format".
> >
> > WRT time I think one thing to clarify in the proposal is how MM will have
> > access to set the timestamp? Presumably this will be a new field in
> > ProducerRecord, right? If so then any user can set the timestamp, right?
> > I'm not sure you answered the questions around how this will work for MM
> > since when MM retains timestamps from multiple partitions they will then
> be
> > out of order and in the past (so the max(lastAppendedTimestamp,
> > currentTimeMillis) override you proposed will not work, right?). If we
> > don't do this then when you set up mirroring the data will all be new and
> > you have the same retention problem you described. Maybe I missed
> > something...?
> >
> > My main motivation is that given that both Samza and Kafka streams are
> > doing work that implies a mandatory client-defined notion of time, I
> really
> > think introducing a different mandatory notion of time in Kafka is going
> to
> > be quite odd. We should think hard about how client-defined time could
> > work. I'm not sure if it can, but I'm also not sure that it can't. Having
> > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> side?
> >
> > When you are saying it won't work you are assuming some particular
> > implementation? Maybe that the index is a monotonically increasing set of
> > pointers to the least record with a timestamp larger than the index time?
> > In other words a search for time X gives the largest offset at which all
> > records are <= X?
> >
> > For retention, I agree with the problem you point out, but I think what
> you
> > are saying in that case is that you want a size limit too. If you use
> > system time you actually hit the same problem: say you do a full dump of
> a
> > DB table with a setting of 7 days retention, your retention will actually
> > not get enforced for the first 7 days because the data is "new to Kafka".
> >
> > -Jay
> >
> >
> > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin  >
> > wrote:
> >
> > > Jay,
> > >
> > > Thanks for the comments. Yes, there are actually three proposals as you
> > > pointed out.

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Jay Kreps
Great, can we change the name to something related to the change--"KIP-31:
Move to relative offsets in compressed message sets".

Also you had mentioned before you were going to expand on the mechanics of
handling these log format changes, right?

-Jay

On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin 
wrote:

> Neha and Jay,
>
> Thanks a lot for the feedback. Good point about splitting the discussion. I
> have split the proposal to three KIPs and it does make each discussion more
> clear:
> KIP-31 - Message format change (Use relative offset)
> KIP-32 - Add CreateTime and LogAppendTime to Kafka message
> KIP-33 - Build a time-based log index
>
> KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31
> and KIP-32 first for now. I will create a separate discussion thread for
> KIP-32 and reply the concerns you raised regarding the timestamp.
>
> So far it looks there is no objection to KIP-31. Since I removed a few part
> from previous KIP and only left the relative offset proposal, it would be
> great if people can take another look to see if there is any concerns.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede  wrote:
>
> > Becket,
> >
> > Nice write-up. Few thoughts -
> >
> > I'd split up the discussion for simplicity. Note that you can always
> group
> > several of these in one patch to reduce the protocol changes people have
> to
> > deal with.This is just a suggestion, but I think the following split
> might
> > make it easier to tackle the changes being proposed -
> >
> >- Relative offsets
> >- Introducing the concept of time
> >- Time-based indexing (separate the usage of the timestamp field from
> >how/whether we want to include a timestamp in the message)
> >
> > I'm a +1 on relative offsets, we should've done it back when we
> introduced
> > it. Other than reducing the CPU overhead, this will also reduce the
> garbage
> > collection overhead on the brokers.
> >
> > On the timestamp field, I generally agree that we should add a timestamp
> to
> > a Kafka message but I'm not quite sold on how this KIP suggests the
> > timestamp be set. Will avoid repeating the downsides of a broker side
> > timestamp mentioned previously in this thread. I think the topic of
> > including a timestamp in a Kafka message requires a lot more thought and
> > details than what's in this KIP. I'd suggest we make it a separate KIP
> that
> > includes a list of all the different use cases for the timestamp (beyond
> > log retention) including stream processing and discuss tradeoffs of
> > including client and broker side timestamps.
> >
> > Agree with the benefit of time-based indexing, but haven't had a chance
> to
> > dive into the design details yet.
> >
> > Thanks,
> > Neha
> >
> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:
> >
> > > Hey Beckett,
> > >
> > > I was proposing splitting up the KIP just for simplicity of discussion.
> > You
> > > can still implement them in one patch. I think otherwise it will be
> hard
> > to
> > > discuss/vote on them since if you like the offset proposal but not the
> > time
> > > proposal what do you do?
> > >
> > > Introducing a second notion of time into Kafka is a pretty massive
> > > philosophical change so it kind of warrants it's own KIP I think it
> isn't
> > > just "Change message format".
> > >
> > > WRT time I think one thing to clarify in the proposal is how MM will
> have
> > > access to set the timestamp? Presumably this will be a new field in
> > > ProducerRecord, right? If so then any user can set the timestamp,
> right?
> > > I'm not sure you answered the questions around how this will work for
> MM
> > > since when MM retains timestamps from multiple partitions they will
> then
> > be
> > > out of order and in the past (so the max(lastAppendedTimestamp,
> > > currentTimeMillis) override you proposed will not work, right?). If we
> > > don't do this then when you set up mirroring the data will all be new
> and
> > > you have the same retention problem you described. Maybe I missed
> > > something...?
> > >
> > > My main motivation is that given that both Samza and Kafka streams are
> > > doing work that implies a mandatory client-defined notion of time, I
> > really
> > > think introducing a different mandatory notion of time in Kafka is
> going
> > to
> > > be quite odd. We should think hard about how client-defined time could
> > > work. I'm not sure if it can, but I'm also not sure that it can't.
> Having
> > > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> > side?
> > >
> > > When you are saying it won't work you are assuming some particular
> > > implementation? Maybe that the index is a monotonically increasing set
> of
> > > pointers to the least record with a timestamp larger than the index
> time?
> > > In other words a search for time X gives the largest offset at which
> all
> > > records are <= X?
> > >
> > > For retention, I agree with the problem you 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Jiangjie Qin
Hi Jay,

I just changed the KIP title and updated the KIP page.

And yes, we are working on a general version control proposal to make the
protocol migration like this more smooth. I will also create a KIP for that
soon.

Thanks,

Jiangjie (Becket) Qin


On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps  wrote:

> Great, can we change the name to something related to the change--"KIP-31:
> Move to relative offsets in compressed message sets".
>
> Also you had mentioned before you were going to expand on the mechanics of
> handling these log format changes, right?
>
> -Jay
>
> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin 
> wrote:
>
> > Neha and Jay,
> >
> > Thanks a lot for the feedback. Good point about splitting the
> discussion. I
> > have split the proposal to three KIPs and it does make each discussion
> more
> > clear:
> > KIP-31 - Message format change (Use relative offset)
> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message
> > KIP-33 - Build a time-based log index
> >
> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31
> > and KIP-32 first for now. I will create a separate discussion thread for
> > KIP-32 and reply the concerns you raised regarding the timestamp.
> >
> > So far it looks there is no objection to KIP-31. Since I removed a few
> part
> > from previous KIP and only left the relative offset proposal, it would be
> > great if people can take another look to see if there is any concerns.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede  wrote:
> >
> > > Becket,
> > >
> > > Nice write-up. Few thoughts -
> > >
> > > I'd split up the discussion for simplicity. Note that you can always
> > group
> > > several of these in one patch to reduce the protocol changes people
> have
> > to
> > > deal with.This is just a suggestion, but I think the following split
> > might
> > > make it easier to tackle the changes being proposed -
> > >
> > >- Relative offsets
> > >- Introducing the concept of time
> > >- Time-based indexing (separate the usage of the timestamp field
> from
> > >how/whether we want to include a timestamp in the message)
> > >
> > > I'm a +1 on relative offsets, we should've done it back when we
> > introduced
> > > it. Other than reducing the CPU overhead, this will also reduce the
> > garbage
> > > collection overhead on the brokers.
> > >
> > > On the timestamp field, I generally agree that we should add a
> timestamp
> > to
> > > a Kafka message but I'm not quite sold on how this KIP suggests the
> > > timestamp be set. Will avoid repeating the downsides of a broker side
> > > timestamp mentioned previously in this thread. I think the topic of
> > > including a timestamp in a Kafka message requires a lot more thought
> and
> > > details than what's in this KIP. I'd suggest we make it a separate KIP
> > that
> > > includes a list of all the different use cases for the timestamp
> (beyond
> > > log retention) including stream processing and discuss tradeoffs of
> > > including client and broker side timestamps.
> > >
> > > Agree with the benefit of time-based indexing, but haven't had a chance
> > to
> > > dive into the design details yet.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:
> > >
> > > > Hey Beckett,
> > > >
> > > > I was proposing splitting up the KIP just for simplicity of
> discussion.
> > > You
> > > > can still implement them in one patch. I think otherwise it will be
> > hard
> > > to
> > > > discuss/vote on them since if you like the offset proposal but not
> the
> > > time
> > > > proposal what do you do?
> > > >
> > > > Introducing a second notion of time into Kafka is a pretty massive
> > > > philosophical change so it kind of warrants it's own KIP I think it
> > isn't
> > > > just "Change message format".
> > > >
> > > > WRT time I think one thing to clarify in the proposal is how MM will
> > have
> > > > access to set the timestamp? Presumably this will be a new field in
> > > > ProducerRecord, right? If so then any user can set the timestamp,
> > right?
> > > > I'm not sure you answered the questions around how this will work for
> > MM
> > > > since when MM retains timestamps from multiple partitions they will
> > then
> > > be
> > > > out of order and in the past (so the max(lastAppendedTimestamp,
> > > > currentTimeMillis) override you proposed will not work, right?). If
> we
> > > > don't do this then when you set up mirroring the data will all be new
> > and
> > > > you have the same retention problem you described. Maybe I missed
> > > > something...?
> > > >
> > > > My main motivation is that given that both Samza and Kafka streams
> are
> > > > doing work that implies a mandatory client-defined notion of time, I
> > > really
> > > > think introducing a different mandatory notion of time in Kafka is
> > going
> > > to
> > > > be quite odd. We should think hard about how client-defined time
> could
> > >

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Joel Koshy
I just wanted to comment on a few points made earlier in this thread:

Concerns on clock skew: at least for the original proposal's scope
(which was more for honoring retention broker-side) this would only be
an issue when spanning leader movements right? i.e., leader migration
latency has to be much less than clock skew for this to be a real
issue wouldn’t it?

Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are
the right place to reason about client-side timestamps precisely due
to the nuances that have been discussed at length in this thread. My
preference would have been to the timestamp (now called
LogAppendTimestamp) have nothing to do with the applications. Ewen
raised a valid concern about leaking such “private/server-side”
timestamps into the protocol spec. i.e., it is fine to have the
CreateTime which is expressly client-provided and immutable
thereafter, but the LogAppendTime is also going part of the protocol
and it would be good to avoid exposure (to client developers) if
possible. Ok, so here is a slightly different approach that I was just
thinking about (and did not think too far so it may not work): do not
add the LogAppendTime to messages. Instead, build the time-based index
on the server side on message arrival time alone. Introduce a new
ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
include the slice of the time-based index for the follower broker.
This way we can at least keep timestamps aligned across brokers for
retention purposes. We do lose the append timestamp for mirroring
pipelines (which appears to be the case in KIP-32 as well).

Configurable index granularity: We can do this but I’m not sure it is
very useful and as Jay noted, a major change from the old proposal
linked from the KIP is the sparse time-based index which we felt was
essential to bound memory usage (and having timestamps on each log
index entry was probably a big waste since in the common case several
messages span the same timestamp). BTW another benefit of the second
index is that it makes it easier to roll-back or throw away if
necessary (vs. modifying the existing index format) - although that
obviously does not help with rolling back the timestamp change in the
message format, but it is one less thing to worry about.

Versioning: I’m not sure everyone is saying the same thing wrt the
scope of this. There is the record format change, but I also think
this ties into all of the API versioning that we already have in
Kafka. The current API versioning approach works fine for
upgrades/downgrades across official Kafka releases, but not so well
between releases. (We almost got bitten by this at LinkedIn with the
recent changes to various requests but were able to work around
these.) We can clarify this in the follow-up KIP.

Thanks,

Joel


On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin  wrote:
> Hi Jay,
>
> I just changed the KIP title and updated the KIP page.
>
> And yes, we are working on a general version control proposal to make the
> protocol migration like this more smooth. I will also create a KIP for that
> soon.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps  wrote:
>
>> Great, can we change the name to something related to the change--"KIP-31:
>> Move to relative offsets in compressed message sets".
>>
>> Also you had mentioned before you were going to expand on the mechanics of
>> handling these log format changes, right?
>>
>> -Jay
>>
>> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin 
>> wrote:
>>
>> > Neha and Jay,
>> >
>> > Thanks a lot for the feedback. Good point about splitting the
>> discussion. I
>> > have split the proposal to three KIPs and it does make each discussion
>> more
>> > clear:
>> > KIP-31 - Message format change (Use relative offset)
>> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message
>> > KIP-33 - Build a time-based log index
>> >
>> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31
>> > and KIP-32 first for now. I will create a separate discussion thread for
>> > KIP-32 and reply the concerns you raised regarding the timestamp.
>> >
>> > So far it looks there is no objection to KIP-31. Since I removed a few
>> part
>> > from previous KIP and only left the relative offset proposal, it would be
>> > great if people can take another look to see if there is any concerns.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede  wrote:
>> >
>> > > Becket,
>> > >
>> > > Nice write-up. Few thoughts -
>> > >
>> > > I'd split up the discussion for simplicity. Note that you can always
>> > group
>> > > several of these in one patch to reduce the protocol changes people
>> have
>> > to
>> > > deal with.This is just a suggestion, but I think the following split
>> > might
>> > > make it easier to tackle the changes being proposed -
>> > >
>> > >- Relative offsets
>> > >- Introducing the concept of time
>> > >- T

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Jay Kreps
Hey Joel,

Yeah the clock issues would arrive when leadership changed or the time on
the machine changed.

Don't you see all the same issues you see with client-defined timestamp's
if you let mm control the timestamp as you were proposing? That means time
is no longer monotonic. A mixture of mirrored data from two source clusters
may be arbitrarily skewed just as from two clients.

Also, Joel, can you just confirm that you guys have talked through the
whole timestamp thing with the Samza folks at LI? The reason I ask about
this is that Samza and Kafka Streams (KIP-28) are both trying to rely on
*client-defined* notions of time and there is a really well thought out
rationale for why it needs to be client-defined. In the absence of these I
think I would also have agreed with you guys about preferring server time.
But because of those, I think it may be super confusing if one part of
Kafka has a mandatory notion of time that means X and another part has a
mandatory notion that means Y.

WRT your idea of a FollowerFetchRequestI had thought of a similar idea
where we use the leader's timestamps to approximately set the follower's
timestamps. I had thought of just adding a partition metadata request that
would subsume the current offset/time lookup and could be used by the
follower to try to approximately keep their timestamps kosher. It's a
little hacky and doesn't help with MM but it is also maybe less invasive so
that approach could be viable.

-Jay





On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy  wrote:

> I just wanted to comment on a few points made earlier in this thread:
>
> Concerns on clock skew: at least for the original proposal's scope
> (which was more for honoring retention broker-side) this would only be
> an issue when spanning leader movements right? i.e., leader migration
> latency has to be much less than clock skew for this to be a real
> issue wouldn’t it?
>
> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are
> the right place to reason about client-side timestamps precisely due
> to the nuances that have been discussed at length in this thread. My
> preference would have been to the timestamp (now called
> LogAppendTimestamp) have nothing to do with the applications. Ewen
> raised a valid concern about leaking such “private/server-side”
> timestamps into the protocol spec. i.e., it is fine to have the
> CreateTime which is expressly client-provided and immutable
> thereafter, but the LogAppendTime is also going part of the protocol
> and it would be good to avoid exposure (to client developers) if
> possible. Ok, so here is a slightly different approach that I was just
> thinking about (and did not think too far so it may not work): do not
> add the LogAppendTime to messages. Instead, build the time-based index
> on the server side on message arrival time alone. Introduce a new
> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
> include the slice of the time-based index for the follower broker.
> This way we can at least keep timestamps aligned across brokers for
> retention purposes. We do lose the append timestamp for mirroring
> pipelines (which appears to be the case in KIP-32 as well).
>
> Configurable index granularity: We can do this but I’m not sure it is
> very useful and as Jay noted, a major change from the old proposal
> linked from the KIP is the sparse time-based index which we felt was
> essential to bound memory usage (and having timestamps on each log
> index entry was probably a big waste since in the common case several
> messages span the same timestamp). BTW another benefit of the second
> index is that it makes it easier to roll-back or throw away if
> necessary (vs. modifying the existing index format) - although that
> obviously does not help with rolling back the timestamp change in the
> message format, but it is one less thing to worry about.
>
> Versioning: I’m not sure everyone is saying the same thing wrt the
> scope of this. There is the record format change, but I also think
> this ties into all of the API versioning that we already have in
> Kafka. The current API versioning approach works fine for
> upgrades/downgrades across official Kafka releases, but not so well
> between releases. (We almost got bitten by this at LinkedIn with the
> recent changes to various requests but were able to work around
> these.) We can clarify this in the follow-up KIP.
>
> Thanks,
>
> Joel
>
>
> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin 
> wrote:
> > Hi Jay,
> >
> > I just changed the KIP title and updated the KIP page.
> >
> > And yes, we are working on a general version control proposal to make the
> > protocol migration like this more smooth. I will also create a KIP for
> that
> > soon.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps  wrote:
> >
> >> Great, can we change the name to something related to the
> change--"KIP-31:
> >> Move to relative offsets in compressed mes

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Joel Koshy
> Don't you see all the same issues you see with client-defined timestamp's
> if you let mm control the timestamp as you were proposing? That means time

Actually I don't think that was in the proposal (or was it?). i.e., I
think it was always supposed to be controlled by the broker (and not
MM).

> Also, Joel, can you just confirm that you guys have talked through the
> whole timestamp thing with the Samza folks at LI? The reason I ask about
> this is that Samza and Kafka Streams (KIP-28) are both trying to rely on

We have not. This is a good point - we will follow-up.

> WRT your idea of a FollowerFetchRequestI had thought of a similar idea
> where we use the leader's timestamps to approximately set the follower's
> timestamps. I had thought of just adding a partition metadata request that
> would subsume the current offset/time lookup and could be used by the
> follower to try to approximately keep their timestamps kosher. It's a
> little hacky and doesn't help with MM but it is also maybe less invasive so
> that approach could be viable.

That would also work, but perhaps responding with the actual leader
offset-timestamp entries (corresponding to the fetched portion) would
be exact and it should be small as well. Anyway, the main motivation
in this was to avoid leaking server-side timestamps to the
message-format if people think it is worth it so the alternatives are
implementation details. My original instinct was that it also avoids a
backwards incompatible change (but it does not because we also have
the relative offset change).

Thanks,

Joel

>
>
>
> On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy  wrote:
>
>> I just wanted to comment on a few points made earlier in this thread:
>>
>> Concerns on clock skew: at least for the original proposal's scope
>> (which was more for honoring retention broker-side) this would only be
>> an issue when spanning leader movements right? i.e., leader migration
>> latency has to be much less than clock skew for this to be a real
>> issue wouldn’t it?
>>
>> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are
>> the right place to reason about client-side timestamps precisely due
>> to the nuances that have been discussed at length in this thread. My
>> preference would have been to the timestamp (now called
>> LogAppendTimestamp) have nothing to do with the applications. Ewen
>> raised a valid concern about leaking such “private/server-side”
>> timestamps into the protocol spec. i.e., it is fine to have the
>> CreateTime which is expressly client-provided and immutable
>> thereafter, but the LogAppendTime is also going part of the protocol
>> and it would be good to avoid exposure (to client developers) if
>> possible. Ok, so here is a slightly different approach that I was just
>> thinking about (and did not think too far so it may not work): do not
>> add the LogAppendTime to messages. Instead, build the time-based index
>> on the server side on message arrival time alone. Introduce a new
>> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
>> include the slice of the time-based index for the follower broker.
>> This way we can at least keep timestamps aligned across brokers for
>> retention purposes. We do lose the append timestamp for mirroring
>> pipelines (which appears to be the case in KIP-32 as well).
>>
>> Configurable index granularity: We can do this but I’m not sure it is
>> very useful and as Jay noted, a major change from the old proposal
>> linked from the KIP is the sparse time-based index which we felt was
>> essential to bound memory usage (and having timestamps on each log
>> index entry was probably a big waste since in the common case several
>> messages span the same timestamp). BTW another benefit of the second
>> index is that it makes it easier to roll-back or throw away if
>> necessary (vs. modifying the existing index format) - although that
>> obviously does not help with rolling back the timestamp change in the
>> message format, but it is one less thing to worry about.
>>
>> Versioning: I’m not sure everyone is saying the same thing wrt the
>> scope of this. There is the record format change, but I also think
>> this ties into all of the API versioning that we already have in
>> Kafka. The current API versioning approach works fine for
>> upgrades/downgrades across official Kafka releases, but not so well
>> between releases. (We almost got bitten by this at LinkedIn with the
>> recent changes to various requests but were able to work around
>> these.) We can clarify this in the follow-up KIP.
>>
>> Thanks,
>>
>> Joel
>>
>>
>> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin 
>> wrote:
>> > Hi Jay,
>> >
>> > I just changed the KIP title and updated the KIP page.
>> >
>> > And yes, we are working on a general version control proposal to make the
>> > protocol migration like this more smooth. I will also create a KIP for
>> that
>> > soon.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Jay Kreps
Ah, I see, I think I misunderstood about MM, it was called out in the
proposal and I thought you were saying you'd retain the timestamp but I
think you're calling out that you're not. In that case you do have the
opposite problem, right? When you add mirroring for a topic all that data
will have a timestamp of now and retention won't be right. Not a blocker
but a bit of a gotcha.

-Jay



On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy  wrote:

> > Don't you see all the same issues you see with client-defined timestamp's
> > if you let mm control the timestamp as you were proposing? That means
> time
>
> Actually I don't think that was in the proposal (or was it?). i.e., I
> think it was always supposed to be controlled by the broker (and not
> MM).
>
> > Also, Joel, can you just confirm that you guys have talked through the
> > whole timestamp thing with the Samza folks at LI? The reason I ask about
> > this is that Samza and Kafka Streams (KIP-28) are both trying to rely on
>
> We have not. This is a good point - we will follow-up.
>
> > WRT your idea of a FollowerFetchRequestI had thought of a similar idea
> > where we use the leader's timestamps to approximately set the follower's
> > timestamps. I had thought of just adding a partition metadata request
> that
> > would subsume the current offset/time lookup and could be used by the
> > follower to try to approximately keep their timestamps kosher. It's a
> > little hacky and doesn't help with MM but it is also maybe less invasive
> so
> > that approach could be viable.
>
> That would also work, but perhaps responding with the actual leader
> offset-timestamp entries (corresponding to the fetched portion) would
> be exact and it should be small as well. Anyway, the main motivation
> in this was to avoid leaking server-side timestamps to the
> message-format if people think it is worth it so the alternatives are
> implementation details. My original instinct was that it also avoids a
> backwards incompatible change (but it does not because we also have
> the relative offset change).
>
> Thanks,
>
> Joel
>
> >
> >
> >
> > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy  wrote:
> >
> >> I just wanted to comment on a few points made earlier in this thread:
> >>
> >> Concerns on clock skew: at least for the original proposal's scope
> >> (which was more for honoring retention broker-side) this would only be
> >> an issue when spanning leader movements right? i.e., leader migration
> >> latency has to be much less than clock skew for this to be a real
> >> issue wouldn’t it?
> >>
> >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are
> >> the right place to reason about client-side timestamps precisely due
> >> to the nuances that have been discussed at length in this thread. My
> >> preference would have been to the timestamp (now called
> >> LogAppendTimestamp) have nothing to do with the applications. Ewen
> >> raised a valid concern about leaking such “private/server-side”
> >> timestamps into the protocol spec. i.e., it is fine to have the
> >> CreateTime which is expressly client-provided and immutable
> >> thereafter, but the LogAppendTime is also going part of the protocol
> >> and it would be good to avoid exposure (to client developers) if
> >> possible. Ok, so here is a slightly different approach that I was just
> >> thinking about (and did not think too far so it may not work): do not
> >> add the LogAppendTime to messages. Instead, build the time-based index
> >> on the server side on message arrival time alone. Introduce a new
> >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
> >> include the slice of the time-based index for the follower broker.
> >> This way we can at least keep timestamps aligned across brokers for
> >> retention purposes. We do lose the append timestamp for mirroring
> >> pipelines (which appears to be the case in KIP-32 as well).
> >>
> >> Configurable index granularity: We can do this but I’m not sure it is
> >> very useful and as Jay noted, a major change from the old proposal
> >> linked from the KIP is the sparse time-based index which we felt was
> >> essential to bound memory usage (and having timestamps on each log
> >> index entry was probably a big waste since in the common case several
> >> messages span the same timestamp). BTW another benefit of the second
> >> index is that it makes it easier to roll-back or throw away if
> >> necessary (vs. modifying the existing index format) - although that
> >> obviously does not help with rolling back the timestamp change in the
> >> message format, but it is one less thing to worry about.
> >>
> >> Versioning: I’m not sure everyone is saying the same thing wrt the
> >> scope of this. There is the record format change, but I also think
> >> this ties into all of the API versioning that we already have in
> >> Kafka. The current API versioning approach works fine for
> >> upgrades/downgrades across official Kafka releases, but not so wel

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-10 Thread Ewen Cheslack-Postava
Re: MM preserving timestamps: Yes, this was how I interpreted the point in
the KIP and I only raised the issue because it restricts the usefulness of
timestamps anytime MM is involved. I agree it's not a deal breaker, but I
wanted to understand exact impact of the change. Some users seem to want to
be able to seek by application-defined timestamps (despite the many obvious
issues involved), and the proposal clearly would not support that unless
the timestamps submitted with the produce requests were respected. If we
ignore client submitted timestamps, then we probably want to try to hide
the timestamps as much as possible in any public interface (e.g. never
shows up in any public consumer APIs), but expose it just enough to be
useful for operational purposes.

Sorry if my devil's advocate position / attempt to map the design space led
to some confusion!

-Ewen


On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps  wrote:

> Ah, I see, I think I misunderstood about MM, it was called out in the
> proposal and I thought you were saying you'd retain the timestamp but I
> think you're calling out that you're not. In that case you do have the
> opposite problem, right? When you add mirroring for a topic all that data
> will have a timestamp of now and retention won't be right. Not a blocker
> but a bit of a gotcha.
>
> -Jay
>
>
>
> On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy  wrote:
>
> > > Don't you see all the same issues you see with client-defined
> timestamp's
> > > if you let mm control the timestamp as you were proposing? That means
> > time
> >
> > Actually I don't think that was in the proposal (or was it?). i.e., I
> > think it was always supposed to be controlled by the broker (and not
> > MM).
> >
> > > Also, Joel, can you just confirm that you guys have talked through the
> > > whole timestamp thing with the Samza folks at LI? The reason I ask
> about
> > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely
> on
> >
> > We have not. This is a good point - we will follow-up.
> >
> > > WRT your idea of a FollowerFetchRequestI had thought of a similar idea
> > > where we use the leader's timestamps to approximately set the
> follower's
> > > timestamps. I had thought of just adding a partition metadata request
> > that
> > > would subsume the current offset/time lookup and could be used by the
> > > follower to try to approximately keep their timestamps kosher. It's a
> > > little hacky and doesn't help with MM but it is also maybe less
> invasive
> > so
> > > that approach could be viable.
> >
> > That would also work, but perhaps responding with the actual leader
> > offset-timestamp entries (corresponding to the fetched portion) would
> > be exact and it should be small as well. Anyway, the main motivation
> > in this was to avoid leaking server-side timestamps to the
> > message-format if people think it is worth it so the alternatives are
> > implementation details. My original instinct was that it also avoids a
> > backwards incompatible change (but it does not because we also have
> > the relative offset change).
> >
> > Thanks,
> >
> > Joel
> >
> > >
> > >
> > >
> > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy 
> wrote:
> > >
> > >> I just wanted to comment on a few points made earlier in this thread:
> > >>
> > >> Concerns on clock skew: at least for the original proposal's scope
> > >> (which was more for honoring retention broker-side) this would only be
> > >> an issue when spanning leader movements right? i.e., leader migration
> > >> latency has to be much less than clock skew for this to be a real
> > >> issue wouldn’t it?
> > >>
> > >> Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are
> > >> the right place to reason about client-side timestamps precisely due
> > >> to the nuances that have been discussed at length in this thread. My
> > >> preference would have been to the timestamp (now called
> > >> LogAppendTimestamp) have nothing to do with the applications. Ewen
> > >> raised a valid concern about leaking such “private/server-side”
> > >> timestamps into the protocol spec. i.e., it is fine to have the
> > >> CreateTime which is expressly client-provided and immutable
> > >> thereafter, but the LogAppendTime is also going part of the protocol
> > >> and it would be good to avoid exposure (to client developers) if
> > >> possible. Ok, so here is a slightly different approach that I was just
> > >> thinking about (and did not think too far so it may not work): do not
> > >> add the LogAppendTime to messages. Instead, build the time-based index
> > >> on the server side on message arrival time alone. Introduce a new
> > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also
> > >> include the slice of the time-based index for the follower broker.
> > >> This way we can at least keep timestamps aligned across brokers for
> > >> retention purposes. We do lose the append timestamp for mirroring
> > >> pipelines (which appears to be the case 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-11 Thread Jiangjie Qin
Ewen and Jay,

They way I see the LogAppendTime is another format of "offset". It serves
the following purpose:
1. Locate messages not only by position, but also by time. The difference
from offset is timestamp is not unique for all messags.
2. Allow broker to manage messages based on time, e.g. retention, rolling
3. Provide convenience for user to search message not only by offset, but
also by timestamp.

For purpose (2) we don't need per message server timestamp. We only need
per log segment server timestamp and propagate it among brokers.

For (1) and (3), we need per message timestamp. Then the question is
whether we should use CreateTime or LogAppendTime?

I completely agree that an application timestamp is very useful for many
use cases. But it seems to me that having Kafka to understand and maintain
application timestamp is a bit over demanding. So I think there is value to
pass on CreateTime for application convenience, but I am not sure it can
replace LogAppendTime. Managing out-of-order CreateTime is equivalent to
allowing producer to send their own offset and ask broker to manage the
offset for them, It is going to be very hard to maintain and could create
huge performance/functional issue because of complicated logic.

About whether we should expose LogAppendTime to broker, I agree that server
timestamp is internal to broker, but isn't offset also an internal concept?
Arguably it's not provided by producer so consumer application logic does
not have to know offset. But user needs to know offset because they need to
know "where is the message" in the log. LogAppendTime provides the answer
of "When was the message appended" to the log. So personally I think it is
reasonable to expose the LogAppendTime to consumers.

I can see some use cases of exposing the LogAppendTime, to name some:
1. Let's say broker has 7 days of log retention, some application wants to
reprocess the data in past 3 days. User can simply provide the timestamp
and start consume.
2. User can easily know lag by time.
3. Cross cluster fail over. This is a more complicated use case, there are
two goals: 1) Not lose message; and 2) do not reconsume tons of messages.
Only knowing offset of cluster A won't help with finding fail over point in
cluster B  because an offset of a cluster means nothing to another cluster.
Timestamp however is a good cross cluster reference in this case.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava 
wrote:

> Re: MM preserving timestamps: Yes, this was how I interpreted the point in
> the KIP and I only raised the issue because it restricts the usefulness of
> timestamps anytime MM is involved. I agree it's not a deal breaker, but I
> wanted to understand exact impact of the change. Some users seem to want to
> be able to seek by application-defined timestamps (despite the many obvious
> issues involved), and the proposal clearly would not support that unless
> the timestamps submitted with the produce requests were respected. If we
> ignore client submitted timestamps, then we probably want to try to hide
> the timestamps as much as possible in any public interface (e.g. never
> shows up in any public consumer APIs), but expose it just enough to be
> useful for operational purposes.
>
> Sorry if my devil's advocate position / attempt to map the design space led
> to some confusion!
>
> -Ewen
>
>
> On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps  wrote:
>
> > Ah, I see, I think I misunderstood about MM, it was called out in the
> > proposal and I thought you were saying you'd retain the timestamp but I
> > think you're calling out that you're not. In that case you do have the
> > opposite problem, right? When you add mirroring for a topic all that data
> > will have a timestamp of now and retention won't be right. Not a blocker
> > but a bit of a gotcha.
> >
> > -Jay
> >
> >
> >
> > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy  wrote:
> >
> > > > Don't you see all the same issues you see with client-defined
> > timestamp's
> > > > if you let mm control the timestamp as you were proposing? That means
> > > time
> > >
> > > Actually I don't think that was in the proposal (or was it?). i.e., I
> > > think it was always supposed to be controlled by the broker (and not
> > > MM).
> > >
> > > > Also, Joel, can you just confirm that you guys have talked through
> the
> > > > whole timestamp thing with the Samza folks at LI? The reason I ask
> > about
> > > > this is that Samza and Kafka Streams (KIP-28) are both trying to rely
> > on
> > >
> > > We have not. This is a good point - we will follow-up.
> > >
> > > > WRT your idea of a FollowerFetchRequestI had thought of a similar
> idea
> > > > where we use the leader's timestamps to approximately set the
> > follower's
> > > > timestamps. I had thought of just adding a partition metadata request
> > > that
> > > > would subsume the current offset/time lookup and could be used by the
> > > > follower to try to approximately keep

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-11 Thread Joel Koshy
Jay had mentioned the scenario of mirror-maker bootstrap which would
effectively reset the logAppendTimestamps for the bootstrapped data.
If we don't include logAppendTimestamps in each message there is a
similar scenario when rebuilding indexes during recovery. So it seems
it may be worth adding that timestamp to messages. The drawback to
that is exposing a server-side concept in the protocol (although we
already do that with offsets). logAppendTimestamp really should be
decided by the broker so I think the first scenario may have to be
written off as a gotcha, but the second may be worth addressing (by
adding it to the message format).

The other point that Jay raised which needs to be addressed (since we
require monotically increasing timestamps in the index) in the
proposal is changing time on the server (I'm a little less concerned
about NTP clock skews than a user explicitly changing the server's
time - i.e., big clock skews). We would at least want to "set back"
all the existing timestamps to guarantee non-decreasing timestamps
with future messages. I'm not sure at this point how best to handle
that, but we could perhaps have a epoch/base-time (or time-correction)
stored in the log directories and base all log index timestamps off
that base-time (or corrected). So if at any time you determine that
time has changed backwards you can adjust that base-time without
having to fix up all the entries. Without knowing the exact diff
between the previous clock and new clock we cannot adjust the times
exactly, but we can at least ensure increasing timestamps.

On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin
 wrote:
> Ewen and Jay,
>
> They way I see the LogAppendTime is another format of "offset". It serves
> the following purpose:
> 1. Locate messages not only by position, but also by time. The difference
> from offset is timestamp is not unique for all messags.
> 2. Allow broker to manage messages based on time, e.g. retention, rolling
> 3. Provide convenience for user to search message not only by offset, but
> also by timestamp.
>
> For purpose (2) we don't need per message server timestamp. We only need
> per log segment server timestamp and propagate it among brokers.
>
> For (1) and (3), we need per message timestamp. Then the question is
> whether we should use CreateTime or LogAppendTime?
>
> I completely agree that an application timestamp is very useful for many
> use cases. But it seems to me that having Kafka to understand and maintain
> application timestamp is a bit over demanding. So I think there is value to
> pass on CreateTime for application convenience, but I am not sure it can
> replace LogAppendTime. Managing out-of-order CreateTime is equivalent to
> allowing producer to send their own offset and ask broker to manage the
> offset for them, It is going to be very hard to maintain and could create
> huge performance/functional issue because of complicated logic.
>
> About whether we should expose LogAppendTime to broker, I agree that server
> timestamp is internal to broker, but isn't offset also an internal concept?
> Arguably it's not provided by producer so consumer application logic does
> not have to know offset. But user needs to know offset because they need to
> know "where is the message" in the log. LogAppendTime provides the answer
> of "When was the message appended" to the log. So personally I think it is
> reasonable to expose the LogAppendTime to consumers.
>
> I can see some use cases of exposing the LogAppendTime, to name some:
> 1. Let's say broker has 7 days of log retention, some application wants to
> reprocess the data in past 3 days. User can simply provide the timestamp
> and start consume.
> 2. User can easily know lag by time.
> 3. Cross cluster fail over. This is a more complicated use case, there are
> two goals: 1) Not lose message; and 2) do not reconsume tons of messages.
> Only knowing offset of cluster A won't help with finding fail over point in
> cluster B  because an offset of a cluster means nothing to another cluster.
> Timestamp however is a good cross cluster reference in this case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava 
> wrote:
>
>> Re: MM preserving timestamps: Yes, this was how I interpreted the point in
>> the KIP and I only raised the issue because it restricts the usefulness of
>> timestamps anytime MM is involved. I agree it's not a deal breaker, but I
>> wanted to understand exact impact of the change. Some users seem to want to
>> be able to seek by application-defined timestamps (despite the many obvious
>> issues involved), and the proposal clearly would not support that unless
>> the timestamps submitted with the produce requests were respected. If we
>> ignore client submitted timestamps, then we probably want to try to hide
>> the timestamps as much as possible in any public interface (e.g. never
>> shows up in any public consumer APIs), but expose it just enough to be
>> usef

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-14 Thread Jiangjie Qin
Hi Joel,

Good point about rebuilding index. I agree that having a per message
LogAppendTime might be necessary. About time adjustment, the solution
sounds promising, but it might be better to make it as a follow up of the
KIP because it seems a really rare use case.

I have another thought on how to manage the out of order timestamps. Maybe
we can do the following:
Create a special log compacted topic __timestamp_index similar to topic,
the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value
is offset. In memory, we keep a map for each TopicPartition, the value is
(timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we
can search out of order message and make sure no message is missing.

Thoughts?

Thanks,

Jiangjie (Becket) Qin

On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy  wrote:

> Jay had mentioned the scenario of mirror-maker bootstrap which would
> effectively reset the logAppendTimestamps for the bootstrapped data.
> If we don't include logAppendTimestamps in each message there is a
> similar scenario when rebuilding indexes during recovery. So it seems
> it may be worth adding that timestamp to messages. The drawback to
> that is exposing a server-side concept in the protocol (although we
> already do that with offsets). logAppendTimestamp really should be
> decided by the broker so I think the first scenario may have to be
> written off as a gotcha, but the second may be worth addressing (by
> adding it to the message format).
>
> The other point that Jay raised which needs to be addressed (since we
> require monotically increasing timestamps in the index) in the
> proposal is changing time on the server (I'm a little less concerned
> about NTP clock skews than a user explicitly changing the server's
> time - i.e., big clock skews). We would at least want to "set back"
> all the existing timestamps to guarantee non-decreasing timestamps
> with future messages. I'm not sure at this point how best to handle
> that, but we could perhaps have a epoch/base-time (or time-correction)
> stored in the log directories and base all log index timestamps off
> that base-time (or corrected). So if at any time you determine that
> time has changed backwards you can adjust that base-time without
> having to fix up all the entries. Without knowing the exact diff
> between the previous clock and new clock we cannot adjust the times
> exactly, but we can at least ensure increasing timestamps.
>
> On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin
>  wrote:
> > Ewen and Jay,
> >
> > They way I see the LogAppendTime is another format of "offset". It serves
> > the following purpose:
> > 1. Locate messages not only by position, but also by time. The difference
> > from offset is timestamp is not unique for all messags.
> > 2. Allow broker to manage messages based on time, e.g. retention, rolling
> > 3. Provide convenience for user to search message not only by offset, but
> > also by timestamp.
> >
> > For purpose (2) we don't need per message server timestamp. We only need
> > per log segment server timestamp and propagate it among brokers.
> >
> > For (1) and (3), we need per message timestamp. Then the question is
> > whether we should use CreateTime or LogAppendTime?
> >
> > I completely agree that an application timestamp is very useful for many
> > use cases. But it seems to me that having Kafka to understand and
> maintain
> > application timestamp is a bit over demanding. So I think there is value
> to
> > pass on CreateTime for application convenience, but I am not sure it can
> > replace LogAppendTime. Managing out-of-order CreateTime is equivalent to
> > allowing producer to send their own offset and ask broker to manage the
> > offset for them, It is going to be very hard to maintain and could create
> > huge performance/functional issue because of complicated logic.
> >
> > About whether we should expose LogAppendTime to broker, I agree that
> server
> > timestamp is internal to broker, but isn't offset also an internal
> concept?
> > Arguably it's not provided by producer so consumer application logic does
> > not have to know offset. But user needs to know offset because they need
> to
> > know "where is the message" in the log. LogAppendTime provides the answer
> > of "When was the message appended" to the log. So personally I think it
> is
> > reasonable to expose the LogAppendTime to consumers.
> >
> > I can see some use cases of exposing the LogAppendTime, to name some:
> > 1. Let's say broker has 7 days of log retention, some application wants
> to
> > reprocess the data in past 3 days. User can simply provide the timestamp
> > and start consume.
> > 2. User can easily know lag by time.
> > 3. Cross cluster fail over. This is a more complicated use case, there
> are
> > two goals: 1) Not lose message; and 2) do not reconsume tons of messages.
> > Only knowing offset of cluster A won't help with finding fail over point
> in
> > cluster B  because an offset of a clus

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-14 Thread Jiangjie Qin
I was trying to send last email before KIP hangout so maybe did not think
it through completely. By the way, the discussion is actually more related
to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
(Although it seems all the discussion are still in this mailing thread...)
This solution in last email is for indexing on CreateTime. It is
essentially what Jay suggested except we use a timestamp map instead of a
memory mapped index file. Please ignore the proposal of using a log
compacted topic. The solution can be simplified to:

Each broker keeps
1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
Offset]]. The timestamp is on minute boundary.
2. A timestamp index file for each segment.
When a broker receives a message (both leader or follower), it checks if
the timestamp index map contains the timestamp for current segment. The
broker add the offset to the map and append an entry to the timestamp index
if the timestamp does not exist. i.e. we only use the index file as a
persistent copy of the index timestamp map.

When a log segment is deleted, we need to:
1. delete the TopicPartitionKeySegment key in the timestamp index map.
2. delete the timestamp index file

This solution assumes we only keep CreateTime in the message. There are a
few trade-offs in this solution:
1. The granularity of search will be per minute.
2. All the timestamp index map has to be in the memory all the time.
3. We need to think about another way to honor log retention time and
time-based log rolling.
4. We lose the benefit brought by including LogAppendTime in the message
mentioned earlier.

I am not sure whether this solution is necessarily better than indexing on
LogAppendTime.

I will update KIP-33 to explain the solution to index on CreateTime and
LogAppendTime respectively and put some more concrete use cases as well.

Thanks,

Jiangjie (Becket) Qin


On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin  wrote:

> Hi Joel,
>
> Good point about rebuilding index. I agree that having a per message
> LogAppendTime might be necessary. About time adjustment, the solution
> sounds promising, but it might be better to make it as a follow up of the
> KIP because it seems a really rare use case.
>
> I have another thought on how to manage the out of order timestamps. Maybe
> we can do the following:
> Create a special log compacted topic __timestamp_index similar to topic,
> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value
> is offset. In memory, we keep a map for each TopicPartition, the value is
> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we
> can search out of order message and make sure no message is missing.
>
> Thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy  wrote:
>
>> Jay had mentioned the scenario of mirror-maker bootstrap which would
>> effectively reset the logAppendTimestamps for the bootstrapped data.
>> If we don't include logAppendTimestamps in each message there is a
>> similar scenario when rebuilding indexes during recovery. So it seems
>> it may be worth adding that timestamp to messages. The drawback to
>> that is exposing a server-side concept in the protocol (although we
>> already do that with offsets). logAppendTimestamp really should be
>> decided by the broker so I think the first scenario may have to be
>> written off as a gotcha, but the second may be worth addressing (by
>> adding it to the message format).
>>
>> The other point that Jay raised which needs to be addressed (since we
>> require monotically increasing timestamps in the index) in the
>> proposal is changing time on the server (I'm a little less concerned
>> about NTP clock skews than a user explicitly changing the server's
>> time - i.e., big clock skews). We would at least want to "set back"
>> all the existing timestamps to guarantee non-decreasing timestamps
>> with future messages. I'm not sure at this point how best to handle
>> that, but we could perhaps have a epoch/base-time (or time-correction)
>> stored in the log directories and base all log index timestamps off
>> that base-time (or corrected). So if at any time you determine that
>> time has changed backwards you can adjust that base-time without
>> having to fix up all the entries. Without knowing the exact diff
>> between the previous clock and new clock we cannot adjust the times
>> exactly, but we can at least ensure increasing timestamps.
>>
>> On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin
>>  wrote:
>> > Ewen and Jay,
>> >
>> > They way I see the LogAppendTime is another format of "offset". It
>> serves
>> > the following purpose:
>> > 1. Locate messages not only by position, but also by time. The
>> difference
>> > from offset is timestamp is not unique for all messags.
>> > 2. Allow broker to manage messages based on time, e.g. retention,
>> rolling
>> > 3. Provide convenience for user to search message not only by offset,
>> but
>> > als

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-14 Thread Jiangjie Qin
I just updated the KIP-33 to explain the indexing on CreateTime and
LogAppendTime respectively. I also used some use case to compare the two
solutions.
Although this is for KIP-33, but it does give a some insights on whether it
makes sense to have a per message LogAppendTime.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index

As a short summary of the conclusions we have already reached on timestamp:
1. It is good to add a timestamp to the message.
2. LogAppendTime should be used for broker policy enforcement (Log
retention / rolling)
3. It is useful to have a CreateTime in message format, which is immutable
after producer sends the message.

There are following questions still in discussion:
1. Should we also add LogAppendTime to message format?
2. which timestamp should we use to build the index.

Let's talk about question 1 first because question 2 is actually a follow
up question for question 1.
Here are what I think:
1a. To enforce broker log policy, theoretically we don't need per-message
LogAppendTime. If we don't include LogAppendTime in message, we still need
to implement a separate solution to pass log segment timestamps among
brokers. That means if we don't include the LogAppendTime in message, there
will be further complication in replication.
1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
comparison)
1c. We have already exposed offset, which is essentially an internal
concept of message in terms of position. Exposing LogAppendTime means we
expose another internal concept of message in terms of time.

Considering the above reasons, personally I think it worth adding the
LogAppendTime to each message.

Any thoughts?

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin  wrote:

> I was trying to send last email before KIP hangout so maybe did not think
> it through completely. By the way, the discussion is actually more related
> to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
> (Although it seems all the discussion are still in this mailing thread...)
> This solution in last email is for indexing on CreateTime. It is
> essentially what Jay suggested except we use a timestamp map instead of a
> memory mapped index file. Please ignore the proposal of using a log
> compacted topic. The solution can be simplified to:
>
> Each broker keeps
> 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
> Offset]]. The timestamp is on minute boundary.
> 2. A timestamp index file for each segment.
> When a broker receives a message (both leader or follower), it checks if
> the timestamp index map contains the timestamp for current segment. The
> broker add the offset to the map and append an entry to the timestamp index
> if the timestamp does not exist. i.e. we only use the index file as a
> persistent copy of the index timestamp map.
>
> When a log segment is deleted, we need to:
> 1. delete the TopicPartitionKeySegment key in the timestamp index map.
> 2. delete the timestamp index file
>
> This solution assumes we only keep CreateTime in the message. There are a
> few trade-offs in this solution:
> 1. The granularity of search will be per minute.
> 2. All the timestamp index map has to be in the memory all the time.
> 3. We need to think about another way to honor log retention time and
> time-based log rolling.
> 4. We lose the benefit brought by including LogAppendTime in the message
> mentioned earlier.
>
> I am not sure whether this solution is necessarily better than indexing on
> LogAppendTime.
>
> I will update KIP-33 to explain the solution to index on CreateTime and
> LogAppendTime respectively and put some more concrete use cases as well.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin  wrote:
>
>> Hi Joel,
>>
>> Good point about rebuilding index. I agree that having a per message
>> LogAppendTime might be necessary. About time adjustment, the solution
>> sounds promising, but it might be better to make it as a follow up of the
>> KIP because it seems a really rare use case.
>>
>> I have another thought on how to manage the out of order timestamps.
>> Maybe we can do the following:
>> Create a special log compacted topic __timestamp_index similar to topic,
>> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value
>> is offset. In memory, we keep a map for each TopicPartition, the value is
>> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we
>> can search out of order message and make sure no message is missing.
>>
>> Thoughts?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy  wrote:
>>
>>> Jay had mentioned the scenario of mirror-maker bootstrap which would
>>> effectively reset the logAppendTimestamps for the bootstrapped data.
>>> If we don't include logAppendTimestamps in each message there is a
>>> similar scenario when rebuilding indexes 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-14 Thread Mayuresh Gharat
I suppose the jira link is different. It points to this jira :
https://issues.apache.org/jira/browse/KAFKA-1


Thanks,

Mayuresh

On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin 
wrote:

> I just updated the KIP-33 to explain the indexing on CreateTime and
> LogAppendTime respectively. I also used some use case to compare the two
> solutions.
> Although this is for KIP-33, but it does give a some insights on whether it
> makes sense to have a per message LogAppendTime.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>
> As a short summary of the conclusions we have already reached on timestamp:
> 1. It is good to add a timestamp to the message.
> 2. LogAppendTime should be used for broker policy enforcement (Log
> retention / rolling)
> 3. It is useful to have a CreateTime in message format, which is immutable
> after producer sends the message.
>
> There are following questions still in discussion:
> 1. Should we also add LogAppendTime to message format?
> 2. which timestamp should we use to build the index.
>
> Let's talk about question 1 first because question 2 is actually a follow
> up question for question 1.
> Here are what I think:
> 1a. To enforce broker log policy, theoretically we don't need per-message
> LogAppendTime. If we don't include LogAppendTime in message, we still need
> to implement a separate solution to pass log segment timestamps among
> brokers. That means if we don't include the LogAppendTime in message, there
> will be further complication in replication.
> 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> comparison)
> 1c. We have already exposed offset, which is essentially an internal
> concept of message in terms of position. Exposing LogAppendTime means we
> expose another internal concept of message in terms of time.
>
> Considering the above reasons, personally I think it worth adding the
> LogAppendTime to each message.
>
> Any thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin  wrote:
>
> > I was trying to send last email before KIP hangout so maybe did not think
> > it through completely. By the way, the discussion is actually more
> related
> > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
> > (Although it seems all the discussion are still in this mailing
> thread...)
> > This solution in last email is for indexing on CreateTime. It is
> > essentially what Jay suggested except we use a timestamp map instead of a
> > memory mapped index file. Please ignore the proposal of using a log
> > compacted topic. The solution can be simplified to:
> >
> > Each broker keeps
> > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
> > Offset]]. The timestamp is on minute boundary.
> > 2. A timestamp index file for each segment.
> > When a broker receives a message (both leader or follower), it checks if
> > the timestamp index map contains the timestamp for current segment. The
> > broker add the offset to the map and append an entry to the timestamp
> index
> > if the timestamp does not exist. i.e. we only use the index file as a
> > persistent copy of the index timestamp map.
> >
> > When a log segment is deleted, we need to:
> > 1. delete the TopicPartitionKeySegment key in the timestamp index map.
> > 2. delete the timestamp index file
> >
> > This solution assumes we only keep CreateTime in the message. There are a
> > few trade-offs in this solution:
> > 1. The granularity of search will be per minute.
> > 2. All the timestamp index map has to be in the memory all the time.
> > 3. We need to think about another way to honor log retention time and
> > time-based log rolling.
> > 4. We lose the benefit brought by including LogAppendTime in the message
> > mentioned earlier.
> >
> > I am not sure whether this solution is necessarily better than indexing
> on
> > LogAppendTime.
> >
> > I will update KIP-33 to explain the solution to index on CreateTime and
> > LogAppendTime respectively and put some more concrete use cases as well.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin  wrote:
> >
> >> Hi Joel,
> >>
> >> Good point about rebuilding index. I agree that having a per message
> >> LogAppendTime might be necessary. About time adjustment, the solution
> >> sounds promising, but it might be better to make it as a follow up of
> the
> >> KIP because it seems a really rare use case.
> >>
> >> I have another thought on how to manage the out of order timestamps.
> >> Maybe we can do the following:
> >> Create a special log compacted topic __timestamp_index similar to topic,
> >> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the
> value
> >> is offset. In memory, we keep a map for each TopicPartition, the value
> is
> >> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This
> way we
> >> can search out of order message and make sure no mess

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-14 Thread Jiangjie Qin
Mayuresh,

I haven't created Jira for KIP-33 yet because it is still under discussion.
I will remove the Jira link.

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 14, 2015 at 8:15 PM, Mayuresh Gharat  wrote:

> I suppose the jira link is different. It points to this jira :
> https://issues.apache.org/jira/browse/KAFKA-1
>
>
> Thanks,
>
> Mayuresh
>
> On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin 
> wrote:
>
> > I just updated the KIP-33 to explain the indexing on CreateTime and
> > LogAppendTime respectively. I also used some use case to compare the two
> > solutions.
> > Although this is for KIP-33, but it does give a some insights on whether
> it
> > makes sense to have a per message LogAppendTime.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
> >
> > As a short summary of the conclusions we have already reached on
> timestamp:
> > 1. It is good to add a timestamp to the message.
> > 2. LogAppendTime should be used for broker policy enforcement (Log
> > retention / rolling)
> > 3. It is useful to have a CreateTime in message format, which is
> immutable
> > after producer sends the message.
> >
> > There are following questions still in discussion:
> > 1. Should we also add LogAppendTime to message format?
> > 2. which timestamp should we use to build the index.
> >
> > Let's talk about question 1 first because question 2 is actually a follow
> > up question for question 1.
> > Here are what I think:
> > 1a. To enforce broker log policy, theoretically we don't need per-message
> > LogAppendTime. If we don't include LogAppendTime in message, we still
> need
> > to implement a separate solution to pass log segment timestamps among
> > brokers. That means if we don't include the LogAppendTime in message,
> there
> > will be further complication in replication.
> > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> > comparison)
> > 1c. We have already exposed offset, which is essentially an internal
> > concept of message in terms of position. Exposing LogAppendTime means we
> > expose another internal concept of message in terms of time.
> >
> > Considering the above reasons, personally I think it worth adding the
> > LogAppendTime to each message.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin 
> wrote:
> >
> > > I was trying to send last email before KIP hangout so maybe did not
> think
> > > it through completely. By the way, the discussion is actually more
> > related
> > > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
> > > (Although it seems all the discussion are still in this mailing
> > thread...)
> > > This solution in last email is for indexing on CreateTime. It is
> > > essentially what Jay suggested except we use a timestamp map instead
> of a
> > > memory mapped index file. Please ignore the proposal of using a log
> > > compacted topic. The solution can be simplified to:
> > >
> > > Each broker keeps
> > > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
> > > Offset]]. The timestamp is on minute boundary.
> > > 2. A timestamp index file for each segment.
> > > When a broker receives a message (both leader or follower), it checks
> if
> > > the timestamp index map contains the timestamp for current segment. The
> > > broker add the offset to the map and append an entry to the timestamp
> > index
> > > if the timestamp does not exist. i.e. we only use the index file as a
> > > persistent copy of the index timestamp map.
> > >
> > > When a log segment is deleted, we need to:
> > > 1. delete the TopicPartitionKeySegment key in the timestamp index map.
> > > 2. delete the timestamp index file
> > >
> > > This solution assumes we only keep CreateTime in the message. There
> are a
> > > few trade-offs in this solution:
> > > 1. The granularity of search will be per minute.
> > > 2. All the timestamp index map has to be in the memory all the time.
> > > 3. We need to think about another way to honor log retention time and
> > > time-based log rolling.
> > > 4. We lose the benefit brought by including LogAppendTime in the
> message
> > > mentioned earlier.
> > >
> > > I am not sure whether this solution is necessarily better than indexing
> > on
> > > LogAppendTime.
> > >
> > > I will update KIP-33 to explain the solution to index on CreateTime and
> > > LogAppendTime respectively and put some more concrete use cases as
> well.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin 
> wrote:
> > >
> > >> Hi Joel,
> > >>
> > >> Good point about rebuilding index. I agree that having a per message
> > >> LogAppendTime might be necessary. About time adjustment, the solution
> > >> sounds promising, but it might be better to make it as a follow up of
> > the
> > >> KIP because it seems a really rare use case.
> > >>
> > >> I have another thought on how 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-17 Thread Jiangjie Qin
Hi folks,

Thanks a lot for the feedback on KIP-31 - move to use relative offset. (Not
including timestamp and index discussion).

I updated the migration plan section as we discussed on KIP hangout. I
think it is the only concern raised so far. Please let me know if there are
further comments about the KIP.

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin  wrote:

> I just updated the KIP-33 to explain the indexing on CreateTime and
> LogAppendTime respectively. I also used some use case to compare the two
> solutions.
> Although this is for KIP-33, but it does give a some insights on whether
> it makes sense to have a per message LogAppendTime.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>
> As a short summary of the conclusions we have already reached on timestamp:
> 1. It is good to add a timestamp to the message.
> 2. LogAppendTime should be used for broker policy enforcement (Log
> retention / rolling)
> 3. It is useful to have a CreateTime in message format, which is immutable
> after producer sends the message.
>
> There are following questions still in discussion:
> 1. Should we also add LogAppendTime to message format?
> 2. which timestamp should we use to build the index.
>
> Let's talk about question 1 first because question 2 is actually a follow
> up question for question 1.
> Here are what I think:
> 1a. To enforce broker log policy, theoretically we don't need per-message
> LogAppendTime. If we don't include LogAppendTime in message, we still need
> to implement a separate solution to pass log segment timestamps among
> brokers. That means if we don't include the LogAppendTime in message, there
> will be further complication in replication.
> 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> comparison)
> 1c. We have already exposed offset, which is essentially an internal
> concept of message in terms of position. Exposing LogAppendTime means we
> expose another internal concept of message in terms of time.
>
> Considering the above reasons, personally I think it worth adding the
> LogAppendTime to each message.
>
> Any thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin  wrote:
>
>> I was trying to send last email before KIP hangout so maybe did not think
>> it through completely. By the way, the discussion is actually more related
>> to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
>> (Although it seems all the discussion are still in this mailing thread...)
>> This solution in last email is for indexing on CreateTime. It is
>> essentially what Jay suggested except we use a timestamp map instead of a
>> memory mapped index file. Please ignore the proposal of using a log
>> compacted topic. The solution can be simplified to:
>>
>> Each broker keeps
>> 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
>> Offset]]. The timestamp is on minute boundary.
>> 2. A timestamp index file for each segment.
>> When a broker receives a message (both leader or follower), it checks if
>> the timestamp index map contains the timestamp for current segment. The
>> broker add the offset to the map and append an entry to the timestamp index
>> if the timestamp does not exist. i.e. we only use the index file as a
>> persistent copy of the index timestamp map.
>>
>> When a log segment is deleted, we need to:
>> 1. delete the TopicPartitionKeySegment key in the timestamp index map.
>> 2. delete the timestamp index file
>>
>> This solution assumes we only keep CreateTime in the message. There are a
>> few trade-offs in this solution:
>> 1. The granularity of search will be per minute.
>> 2. All the timestamp index map has to be in the memory all the time.
>> 3. We need to think about another way to honor log retention time and
>> time-based log rolling.
>> 4. We lose the benefit brought by including LogAppendTime in the message
>> mentioned earlier.
>>
>> I am not sure whether this solution is necessarily better than indexing
>> on LogAppendTime.
>>
>> I will update KIP-33 to explain the solution to index on CreateTime and
>> LogAppendTime respectively and put some more concrete use cases as well.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin  wrote:
>>
>>> Hi Joel,
>>>
>>> Good point about rebuilding index. I agree that having a per message
>>> LogAppendTime might be necessary. About time adjustment, the solution
>>> sounds promising, but it might be better to make it as a follow up of the
>>> KIP because it seems a really rare use case.
>>>
>>> I have another thought on how to manage the out of order timestamps.
>>> Maybe we can do the following:
>>> Create a special log compacted topic __timestamp_index similar to topic,
>>> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value
>>> is offset. In memory, we keep a map for each TopicPartition, the value is

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-21 Thread Jun Rao
Jiangjie,

Thanks for the writeup. A few comments below.

1. We will need to be a bit careful with fetch requests from the followers.
Basically, as we are doing a rolling upgrade of the brokers, the follower
can't start issuing V2 of the fetch request until the rest of the brokers
are ready to process it. So, we probably need to make use of
inter.broker.protocol.version to do the rolling upgrade. In step 1, we set
inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of
the brokers. At this point, all brokers are capable of processing V2 of
fetch requests, but no broker is using it yet. In step 2, we
set inter.broker.protocol.version to 0.10 and do another round of rolling
restart of the brokers. In this step, the upgraded brokers will start
issuing V2 of the fetch request.

2. If we do #1, I am not sure if there is still a need for
message.format.version since the broker can start writing messages in the
new format after inter.broker.protocol.version is set to 0.10.

3. It wasn't clear from the wiki whether the base offset in the shallow
message is the offset of the first or the last inner message. It's better
to use the offset of the last inner message. This way, the followers don't
have to decompress messages to figure out the next fetch offset.

4. I am not sure that I understand the following sentence in the wiki. It
seems that the relative offsets in a compressed message don't have to be
consecutive. If so, why do we need to update the relative offsets in the
inner messages?
"When the log cleaner compacts log segments, it needs to update the inner
message's relative offset values."

Thanks,

Jun

On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin 
wrote:

> Hi folks,
>
> Thanks a lot for the feedback on KIP-31 - move to use relative offset. (Not
> including timestamp and index discussion).
>
> I updated the migration plan section as we discussed on KIP hangout. I
> think it is the only concern raised so far. Please let me know if there are
> further comments about the KIP.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin  wrote:
>
> > I just updated the KIP-33 to explain the indexing on CreateTime and
> > LogAppendTime respectively. I also used some use case to compare the two
> > solutions.
> > Although this is for KIP-33, but it does give a some insights on whether
> > it makes sense to have a per message LogAppendTime.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
> >
> > As a short summary of the conclusions we have already reached on
> timestamp:
> > 1. It is good to add a timestamp to the message.
> > 2. LogAppendTime should be used for broker policy enforcement (Log
> > retention / rolling)
> > 3. It is useful to have a CreateTime in message format, which is
> immutable
> > after producer sends the message.
> >
> > There are following questions still in discussion:
> > 1. Should we also add LogAppendTime to message format?
> > 2. which timestamp should we use to build the index.
> >
> > Let's talk about question 1 first because question 2 is actually a follow
> > up question for question 1.
> > Here are what I think:
> > 1a. To enforce broker log policy, theoretically we don't need per-message
> > LogAppendTime. If we don't include LogAppendTime in message, we still
> need
> > to implement a separate solution to pass log segment timestamps among
> > brokers. That means if we don't include the LogAppendTime in message,
> there
> > will be further complication in replication.
> > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> > comparison)
> > 1c. We have already exposed offset, which is essentially an internal
> > concept of message in terms of position. Exposing LogAppendTime means we
> > expose another internal concept of message in terms of time.
> >
> > Considering the above reasons, personally I think it worth adding the
> > LogAppendTime to each message.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin 
> wrote:
> >
> >> I was trying to send last email before KIP hangout so maybe did not
> think
> >> it through completely. By the way, the discussion is actually more
> related
> >> to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
> >> (Although it seems all the discussion are still in this mailing
> thread...)
> >> This solution in last email is for indexing on CreateTime. It is
> >> essentially what Jay suggested except we use a timestamp map instead of
> a
> >> memory mapped index file. Please ignore the proposal of using a log
> >> compacted topic. The solution can be simplified to:
> >>
> >> Each broker keeps
> >> 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
> >> Offset]]. The timestamp is on minute boundary.
> >> 2. A timestamp index file for each segment.
> >> When a broker receives a message (both leader or follower), it checks if
> >> t

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-21 Thread Jay Kreps
For (3) I don't think we can change the offset in the outer message from
what it is today as it is relied upon in the search done in the log layer.
The reason it is the offset of the last message rather than the first is to
make the offset a least upper bound (i.e. the smallest offset >=
fetch_offset). This needs to work the same for both gaps due to compacted
topics and gaps due to compressed messages.

So imagine you had a compressed set with offsets {45, 46, 47, 48} if you
assigned this compressed set the offset 45 a fetch for 46 would actually
skip ahead to 49 (the least upper bound).

-Jay

On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao  wrote:

> Jiangjie,
>
> Thanks for the writeup. A few comments below.
>
> 1. We will need to be a bit careful with fetch requests from the followers.
> Basically, as we are doing a rolling upgrade of the brokers, the follower
> can't start issuing V2 of the fetch request until the rest of the brokers
> are ready to process it. So, we probably need to make use of
> inter.broker.protocol.version to do the rolling upgrade. In step 1, we set
> inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of
> the brokers. At this point, all brokers are capable of processing V2 of
> fetch requests, but no broker is using it yet. In step 2, we
> set inter.broker.protocol.version to 0.10 and do another round of rolling
> restart of the brokers. In this step, the upgraded brokers will start
> issuing V2 of the fetch request.
>
> 2. If we do #1, I am not sure if there is still a need for
> message.format.version since the broker can start writing messages in the
> new format after inter.broker.protocol.version is set to 0.10.
>
> 3. It wasn't clear from the wiki whether the base offset in the shallow
> message is the offset of the first or the last inner message. It's better
> to use the offset of the last inner message. This way, the followers don't
> have to decompress messages to figure out the next fetch offset.
>
> 4. I am not sure that I understand the following sentence in the wiki. It
> seems that the relative offsets in a compressed message don't have to be
> consecutive. If so, why do we need to update the relative offsets in the
> inner messages?
> "When the log cleaner compacts log segments, it needs to update the inner
> message's relative offset values."
>
> Thanks,
>
> Jun
>
> On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin 
> wrote:
>
> > Hi folks,
> >
> > Thanks a lot for the feedback on KIP-31 - move to use relative offset.
> (Not
> > including timestamp and index discussion).
> >
> > I updated the migration plan section as we discussed on KIP hangout. I
> > think it is the only concern raised so far. Please let me know if there
> are
> > further comments about the KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin  wrote:
> >
> > > I just updated the KIP-33 to explain the indexing on CreateTime and
> > > LogAppendTime respectively. I also used some use case to compare the
> two
> > > solutions.
> > > Although this is for KIP-33, but it does give a some insights on
> whether
> > > it makes sense to have a per message LogAppendTime.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
> > >
> > > As a short summary of the conclusions we have already reached on
> > timestamp:
> > > 1. It is good to add a timestamp to the message.
> > > 2. LogAppendTime should be used for broker policy enforcement (Log
> > > retention / rolling)
> > > 3. It is useful to have a CreateTime in message format, which is
> > immutable
> > > after producer sends the message.
> > >
> > > There are following questions still in discussion:
> > > 1. Should we also add LogAppendTime to message format?
> > > 2. which timestamp should we use to build the index.
> > >
> > > Let's talk about question 1 first because question 2 is actually a
> follow
> > > up question for question 1.
> > > Here are what I think:
> > > 1a. To enforce broker log policy, theoretically we don't need
> per-message
> > > LogAppendTime. If we don't include LogAppendTime in message, we still
> > need
> > > to implement a separate solution to pass log segment timestamps among
> > > brokers. That means if we don't include the LogAppendTime in message,
> > there
> > > will be further complication in replication.
> > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> > > comparison)
> > > 1c. We have already exposed offset, which is essentially an internal
> > > concept of message in terms of position. Exposing LogAppendTime means
> we
> > > expose another internal concept of message in terms of time.
> > >
> > > Considering the above reasons, personally I think it worth adding the
> > > LogAppendTime to each message.
> > >
> > > Any thoughts?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin 
> > wrote:
> > >
> > >> I was trying to

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-22 Thread Jiangjie Qin
Hi Jun,

Thanks a lot for the comments. Please see the inline reply. I will update
the KIP page accordingly.

On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao  wrote:

> Jiangjie,
>
> Thanks for the writeup. A few comments below.
>
> 1. We will need to be a bit careful with fetch requests from the followers.
> Basically, as we are doing a rolling upgrade of the brokers, the follower
> can't start issuing V2 of the fetch request until the rest of the brokers
> are ready to process it. So, we probably need to make use of
> inter.broker.protocol.version to do the rolling upgrade. In step 1, we set
> inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of
> the brokers. At this point, all brokers are capable of processing V2 of
> fetch requests, but no broker is using it yet. In step 2, we
> set inter.broker.protocol.version to 0.10 and do another round of rolling
> restart of the brokers. In this step, the upgraded brokers will start
> issuing V2 of the fetch request.
>
Thanks for reminding. Yes, we should use inter broker protocol in this
case. A related thought about the mapping between inter broker protocol and
the release version.

Today the inter broker protocol and release version is one-to-one mapping.
If we use 0.10 for the new protocol now(with FetchRequest V2) and later on
we need to bump up inter protocol version(e.g. FetchRequest V3) before 0.10
officially release again, should we still use version 0.10 but point it to
FetchRequest V3? If we do that, inter broker protocol 0.10 with
FetchRequest V2 will be as if never existed. The users who are running on
inter broker protocol 0.10 with FetchRequest V2 (not official release but
trunk at that time) will have to downgrade to use inter broker protocol 0.9
before they upgrade to inter broker protocol 0.10 with FetchRequest V3.

If we let multiple inter broker protocol changes between two official
releases to associate with last official build, this might solve the
problem. For example, this time we can use "0.9-V1", if there are further
inter broker protocol change we can use "0.9-V2" and so on. This will avoid
losing version of inter broker protocol changes between official releases.


>
> 2. If we do #1, I am not sure if there is still a need for
> message.format.version since the broker can start writing messages in the
> new format after inter.broker.protocol.version is set to 0.10.
>
Right, I don't think we need this anymore.

>

3. It wasn't clear from the wiki whether the base offset in the shallow
> message is the offset of the first or the last inner message. It's better
> to use the offset of the last inner message. This way, the followers don't
> have to decompress messages to figure out the next fetch offset.
>
Good point. Last offset is better, which is also what we are doing now.
I'll update the KIP page.

>
> 4. I am not sure that I understand the following sentence in the wiki. It
> seems that the relative offsets in a compressed message don't have to be
> consecutive. If so, why do we need to update the relative offsets in the
> inner messages?
> "When the log cleaner compacts log segments, it needs to update the inner
> message's relative offset values."
>
This is assuming that when we compact log segments, we might compact
multiple message sets into one message set. If so, the relative offset in
original message set is likely different from the relative offset in the
compacted message set. That's why we need to update the inner message
relative offset values.

>
> Thanks,
>
> Jun
>
> On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin 
> wrote:
>
> > Hi folks,
> >
> > Thanks a lot for the feedback on KIP-31 - move to use relative offset.
> (Not
> > including timestamp and index discussion).
> >
> > I updated the migration plan section as we discussed on KIP hangout. I
> > think it is the only concern raised so far. Please let me know if there
> are
> > further comments about the KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin  wrote:
> >
> > > I just updated the KIP-33 to explain the indexing on CreateTime and
> > > LogAppendTime respectively. I also used some use case to compare the
> two
> > > solutions.
> > > Although this is for KIP-33, but it does give a some insights on
> whether
> > > it makes sense to have a per message LogAppendTime.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
> > >
> > > As a short summary of the conclusions we have already reached on
> > timestamp:
> > > 1. It is good to add a timestamp to the message.
> > > 2. LogAppendTime should be used for broker policy enforcement (Log
> > > retention / rolling)
> > > 3. It is useful to have a CreateTime in message format, which is
> > immutable
> > > after producer sends the message.
> > >
> > > There are following questions still in discussion:
> > > 1. Should we also add LogAppendTime to message format?
> > > 2. which timestamp should we use to 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-22 Thread Jiangjie Qin
Thanks for the explanation, Jay.
Agreed. We have to keep the offset to be the offset of last inner message.

Jiangjie (Becket) Qin

On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps  wrote:

> For (3) I don't think we can change the offset in the outer message from
> what it is today as it is relied upon in the search done in the log layer.
> The reason it is the offset of the last message rather than the first is to
> make the offset a least upper bound (i.e. the smallest offset >=
> fetch_offset). This needs to work the same for both gaps due to compacted
> topics and gaps due to compressed messages.
>
> So imagine you had a compressed set with offsets {45, 46, 47, 48} if you
> assigned this compressed set the offset 45 a fetch for 46 would actually
> skip ahead to 49 (the least upper bound).
>
> -Jay
>
> On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao  wrote:
>
> > Jiangjie,
> >
> > Thanks for the writeup. A few comments below.
> >
> > 1. We will need to be a bit careful with fetch requests from the
> followers.
> > Basically, as we are doing a rolling upgrade of the brokers, the follower
> > can't start issuing V2 of the fetch request until the rest of the brokers
> > are ready to process it. So, we probably need to make use of
> > inter.broker.protocol.version to do the rolling upgrade. In step 1, we
> set
> > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of
> > the brokers. At this point, all brokers are capable of processing V2 of
> > fetch requests, but no broker is using it yet. In step 2, we
> > set inter.broker.protocol.version to 0.10 and do another round of rolling
> > restart of the brokers. In this step, the upgraded brokers will start
> > issuing V2 of the fetch request.
> >
> > 2. If we do #1, I am not sure if there is still a need for
> > message.format.version since the broker can start writing messages in the
> > new format after inter.broker.protocol.version is set to 0.10.
> >
> > 3. It wasn't clear from the wiki whether the base offset in the shallow
> > message is the offset of the first or the last inner message. It's better
> > to use the offset of the last inner message. This way, the followers
> don't
> > have to decompress messages to figure out the next fetch offset.
> >
> > 4. I am not sure that I understand the following sentence in the wiki. It
> > seems that the relative offsets in a compressed message don't have to be
> > consecutive. If so, why do we need to update the relative offsets in the
> > inner messages?
> > "When the log cleaner compacts log segments, it needs to update the inner
> > message's relative offset values."
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin  >
> > wrote:
> >
> > > Hi folks,
> > >
> > > Thanks a lot for the feedback on KIP-31 - move to use relative offset.
> > (Not
> > > including timestamp and index discussion).
> > >
> > > I updated the migration plan section as we discussed on KIP hangout. I
> > > think it is the only concern raised so far. Please let me know if there
> > are
> > > further comments about the KIP.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin 
> wrote:
> > >
> > > > I just updated the KIP-33 to explain the indexing on CreateTime and
> > > > LogAppendTime respectively. I also used some use case to compare the
> > two
> > > > solutions.
> > > > Although this is for KIP-33, but it does give a some insights on
> > whether
> > > > it makes sense to have a per message LogAppendTime.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
> > > >
> > > > As a short summary of the conclusions we have already reached on
> > > timestamp:
> > > > 1. It is good to add a timestamp to the message.
> > > > 2. LogAppendTime should be used for broker policy enforcement (Log
> > > > retention / rolling)
> > > > 3. It is useful to have a CreateTime in message format, which is
> > > immutable
> > > > after producer sends the message.
> > > >
> > > > There are following questions still in discussion:
> > > > 1. Should we also add LogAppendTime to message format?
> > > > 2. which timestamp should we use to build the index.
> > > >
> > > > Let's talk about question 1 first because question 2 is actually a
> > follow
> > > > up question for question 1.
> > > > Here are what I think:
> > > > 1a. To enforce broker log policy, theoretically we don't need
> > per-message
> > > > LogAppendTime. If we don't include LogAppendTime in message, we still
> > > need
> > > > to implement a separate solution to pass log segment timestamps among
> > > > brokers. That means if we don't include the LogAppendTime in message,
> > > there
> > > > will be further complication in replication.
> > > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has
> detail
> > > > comparison)
> > > > 1c. We have already exposed offset, which is essentially an internal
> > > > concept of message in 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-24 Thread Joel Koshy
The upgrade plan works, but the potentially long interim phase of
skipping zero-copy for down-conversion could be problematic especially
for large deployments with large consumer fan-out. It is not only
going to be memory overhead but CPU as well - since you need to
decompress, write absolute offsets, then recompress for every v1
fetch. i.e., it may be safer (but obviously more tedious) to have a
multi-step upgrade process. For e.g.,:

1 - Upgrade brokers, but disable the feature. i.e., either reject
producer requests v2 or down-convert to old message format (with
absolute offsets)
2 - Upgrade clients, but they should only use v1 requests
3 - Switch (all or most) consumers to use v2 fetch format (which will
use zero-copy).
4 - Turn on the feature on the brokers to allow producer requests v2
5 - Switch producers to use v2 produce format

(You may want a v1 fetch rate metric and decide to proceed to step 4
only when that comes down to a trickle)

I'm not sure if the prolonged upgrade process is viable in every
scenario. I think it should work at LinkedIn for e.g., but may not for
other environments.

Joel


On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin
 wrote:
> Thanks for the explanation, Jay.
> Agreed. We have to keep the offset to be the offset of last inner message.
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps  wrote:
>
>> For (3) I don't think we can change the offset in the outer message from
>> what it is today as it is relied upon in the search done in the log layer.
>> The reason it is the offset of the last message rather than the first is to
>> make the offset a least upper bound (i.e. the smallest offset >=
>> fetch_offset). This needs to work the same for both gaps due to compacted
>> topics and gaps due to compressed messages.
>>
>> So imagine you had a compressed set with offsets {45, 46, 47, 48} if you
>> assigned this compressed set the offset 45 a fetch for 46 would actually
>> skip ahead to 49 (the least upper bound).
>>
>> -Jay
>>
>> On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao  wrote:
>>
>> > Jiangjie,
>> >
>> > Thanks for the writeup. A few comments below.
>> >
>> > 1. We will need to be a bit careful with fetch requests from the
>> followers.
>> > Basically, as we are doing a rolling upgrade of the brokers, the follower
>> > can't start issuing V2 of the fetch request until the rest of the brokers
>> > are ready to process it. So, we probably need to make use of
>> > inter.broker.protocol.version to do the rolling upgrade. In step 1, we
>> set
>> > inter.broker.protocol.version to 0.9 and do a round of rolling upgrade of
>> > the brokers. At this point, all brokers are capable of processing V2 of
>> > fetch requests, but no broker is using it yet. In step 2, we
>> > set inter.broker.protocol.version to 0.10 and do another round of rolling
>> > restart of the brokers. In this step, the upgraded brokers will start
>> > issuing V2 of the fetch request.
>> >
>> > 2. If we do #1, I am not sure if there is still a need for
>> > message.format.version since the broker can start writing messages in the
>> > new format after inter.broker.protocol.version is set to 0.10.
>> >
>> > 3. It wasn't clear from the wiki whether the base offset in the shallow
>> > message is the offset of the first or the last inner message. It's better
>> > to use the offset of the last inner message. This way, the followers
>> don't
>> > have to decompress messages to figure out the next fetch offset.
>> >
>> > 4. I am not sure that I understand the following sentence in the wiki. It
>> > seems that the relative offsets in a compressed message don't have to be
>> > consecutive. If so, why do we need to update the relative offsets in the
>> > inner messages?
>> > "When the log cleaner compacts log segments, it needs to update the inner
>> > message's relative offset values."
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Thu, Sep 17, 2015 at 12:54 PM, Jiangjie Qin > >
>> > wrote:
>> >
>> > > Hi folks,
>> > >
>> > > Thanks a lot for the feedback on KIP-31 - move to use relative offset.
>> > (Not
>> > > including timestamp and index discussion).
>> > >
>> > > I updated the migration plan section as we discussed on KIP hangout. I
>> > > think it is the only concern raised so far. Please let me know if there
>> > are
>> > > further comments about the KIP.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin 
>> wrote:
>> > >
>> > > > I just updated the KIP-33 to explain the indexing on CreateTime and
>> > > > LogAppendTime respectively. I also used some use case to compare the
>> > two
>> > > > solutions.
>> > > > Although this is for KIP-33, but it does give a some insights on
>> > whether
>> > > > it makes sense to have a per message LogAppendTime.
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>> > > >
>> > > > As a short summary of the conclusions we have alread

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-24 Thread Jiangjie Qin
Hi Joel,

That is a valid concern. And that is actually why we had the
message.format.version before.

My original thinking was:
1. upgrade the broker to support both V1 and V2 for consumer/producer
request.
2. configure broker to store V1 on the disk. (message.format.version = 1)
3. upgrade the consumer to support both V1 and V2 for consumer request.
4. Meanwhile some producer might also be upgraded to use producer request
V2.
5. At this point, for producer request V2, broker will do down conversion.
Regardless consumers are upgraded or not, broker will always use zero-copy
transfer. Because supposedly both old and upgraded consumer should be able
to understand that.
6. After most of the consumers are upgraded, We set message.format.version
= 1 and only do down conversion for old consumers.

This way we don't need to reject producer request V2. And we always to
version conversion for the minority of the consumers. However I have a few
concerns over this approach, not sure if they actually matters.

A. (5) is not true for now. Today the clients only uses the highest
version, i.e. a producer/consumer wouldn't parse a lower version of
response even the code exist there. I think supposedly, consumer should
stick to one version and broker should do the conversion.
B. Let's say (A) is not a concern, we make all the clients support all the
versions it knows. At step(6), there will be a transitional period that
user will see both messages with new and old version. For KIP-31 only it
might be OK because we are not adding anything into the message. But if the
message has different fields (e.g. KIP-32), that means people will get
those fields from some messages but not from some other messages. Would
that be a problem?

If (A) and (B) are not a problem. Is the above procedure able to address
your concern?

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy  wrote:

> The upgrade plan works, but the potentially long interim phase of
> skipping zero-copy for down-conversion could be problematic especially
> for large deployments with large consumer fan-out. It is not only
> going to be memory overhead but CPU as well - since you need to
> decompress, write absolute offsets, then recompress for every v1
> fetch. i.e., it may be safer (but obviously more tedious) to have a
> multi-step upgrade process. For e.g.,:
>
> 1 - Upgrade brokers, but disable the feature. i.e., either reject
> producer requests v2 or down-convert to old message format (with
> absolute offsets)
> 2 - Upgrade clients, but they should only use v1 requests
> 3 - Switch (all or most) consumers to use v2 fetch format (which will
> use zero-copy).
> 4 - Turn on the feature on the brokers to allow producer requests v2
> 5 - Switch producers to use v2 produce format
>
> (You may want a v1 fetch rate metric and decide to proceed to step 4
> only when that comes down to a trickle)
>
> I'm not sure if the prolonged upgrade process is viable in every
> scenario. I think it should work at LinkedIn for e.g., but may not for
> other environments.
>
> Joel
>
>
> On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin
>  wrote:
> > Thanks for the explanation, Jay.
> > Agreed. We have to keep the offset to be the offset of last inner
> message.
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps  wrote:
> >
> >> For (3) I don't think we can change the offset in the outer message from
> >> what it is today as it is relied upon in the search done in the log
> layer.
> >> The reason it is the offset of the last message rather than the first
> is to
> >> make the offset a least upper bound (i.e. the smallest offset >=
> >> fetch_offset). This needs to work the same for both gaps due to
> compacted
> >> topics and gaps due to compressed messages.
> >>
> >> So imagine you had a compressed set with offsets {45, 46, 47, 48} if you
> >> assigned this compressed set the offset 45 a fetch for 46 would actually
> >> skip ahead to 49 (the least upper bound).
> >>
> >> -Jay
> >>
> >> On Mon, Sep 21, 2015 at 5:17 PM, Jun Rao  wrote:
> >>
> >> > Jiangjie,
> >> >
> >> > Thanks for the writeup. A few comments below.
> >> >
> >> > 1. We will need to be a bit careful with fetch requests from the
> >> followers.
> >> > Basically, as we are doing a rolling upgrade of the brokers, the
> follower
> >> > can't start issuing V2 of the fetch request until the rest of the
> brokers
> >> > are ready to process it. So, we probably need to make use of
> >> > inter.broker.protocol.version to do the rolling upgrade. In step 1, we
> >> set
> >> > inter.broker.protocol.version to 0.9 and do a round of rolling
> upgrade of
> >> > the brokers. At this point, all brokers are capable of processing V2
> of
> >> > fetch requests, but no broker is using it yet. In step 2, we
> >> > set inter.broker.protocol.version to 0.10 and do another round of
> rolling
> >> > restart of the brokers. In this step, the upgraded brokers will start
> >> > issuing V2 of the fetch

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-25 Thread Joel Koshy
Hey Becket,

I do think we need the interim deployment phase, set
message.format.version and down-convert for producer request v2.
Down-conversion for v2 is no worse than what the broker is doing now.
I don't think we want a prolonged phase where we down-convert for
every v1 fetch - in fact I'm less concerned about losing zero-copy for
those fetch requests than the overhead of decompress/recompress for
those fetches as that would increase your CPU usage by 4x, 5x or
whatever the average consumer fan-out is. The
decompression/recompression will put further memory pressure as well.

It is true that clients send the latest request version that it is
compiled with and that does not need to change. The broker can
continue to send back with zero-copy for fetch request version 2 as
well (even if during the interim phase during which it down-converts
producer request v2). The consumer iterator (for old consumer) or the
Fetcher (for new consumer) needs to be able to handle messages that
are in original as well as new (relative offset) format.

Thanks,

Joel


On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin  wrote:
> Hi Joel,
>
> That is a valid concern. And that is actually why we had the
> message.format.version before.
>
> My original thinking was:
> 1. upgrade the broker to support both V1 and V2 for consumer/producer
> request.
> 2. configure broker to store V1 on the disk. (message.format.version = 1)
> 3. upgrade the consumer to support both V1 and V2 for consumer request.
> 4. Meanwhile some producer might also be upgraded to use producer request
> V2.
> 5. At this point, for producer request V2, broker will do down conversion.
> Regardless consumers are upgraded or not, broker will always use zero-copy
> transfer. Because supposedly both old and upgraded consumer should be able
> to understand that.
> 6. After most of the consumers are upgraded, We set message.format.version
> = 1 and only do down conversion for old consumers.
>
> This way we don't need to reject producer request V2. And we always to
> version conversion for the minority of the consumers. However I have a few
> concerns over this approach, not sure if they actually matters.
>
> A. (5) is not true for now. Today the clients only uses the highest
> version, i.e. a producer/consumer wouldn't parse a lower version of
> response even the code exist there. I think supposedly, consumer should
> stick to one version and broker should do the conversion.
> B. Let's say (A) is not a concern, we make all the clients support all the
> versions it knows. At step(6), there will be a transitional period that
> user will see both messages with new and old version. For KIP-31 only it
> might be OK because we are not adding anything into the message. But if the
> message has different fields (e.g. KIP-32), that means people will get
> those fields from some messages but not from some other messages. Would
> that be a problem?
>
> If (A) and (B) are not a problem. Is the above procedure able to address
> your concern?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy  wrote:
>
>> The upgrade plan works, but the potentially long interim phase of
>> skipping zero-copy for down-conversion could be problematic especially
>> for large deployments with large consumer fan-out. It is not only
>> going to be memory overhead but CPU as well - since you need to
>> decompress, write absolute offsets, then recompress for every v1
>> fetch. i.e., it may be safer (but obviously more tedious) to have a
>> multi-step upgrade process. For e.g.,:
>>
>> 1 - Upgrade brokers, but disable the feature. i.e., either reject
>> producer requests v2 or down-convert to old message format (with
>> absolute offsets)
>> 2 - Upgrade clients, but they should only use v1 requests
>> 3 - Switch (all or most) consumers to use v2 fetch format (which will
>> use zero-copy).
>> 4 - Turn on the feature on the brokers to allow producer requests v2
>> 5 - Switch producers to use v2 produce format
>>
>> (You may want a v1 fetch rate metric and decide to proceed to step 4
>> only when that comes down to a trickle)
>>
>> I'm not sure if the prolonged upgrade process is viable in every
>> scenario. I think it should work at LinkedIn for e.g., but may not for
>> other environments.
>>
>> Joel
>>
>>
>> On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin
>>  wrote:
>> > Thanks for the explanation, Jay.
>> > Agreed. We have to keep the offset to be the offset of last inner
>> message.
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Mon, Sep 21, 2015 at 6:21 PM, Jay Kreps  wrote:
>> >
>> >> For (3) I don't think we can change the offset in the outer message from
>> >> what it is today as it is relied upon in the search done in the log
>> layer.
>> >> The reason it is the offset of the last message rather than the first
>> is to
>> >> make the offset a least upper bound (i.e. the smallest offset >=
>> >> fetch_offset). This needs to work the same for both gaps due to
>> compacted
>> >> 

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-29 Thread Jiangjie Qin
Hi Joel and other folks.

I updated the KIP page with the two phase roll out, which avoids the
conversion for majority of users.

To do that we need to add a message.format.version configuration to broker.
Other than that there is no interface change from the previous proposal.
Please let me know if you have concern about the updated proposal.

Thanks,

Jiangjie (Becket) Qin

On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy  wrote:

> Hey Becket,
>
> I do think we need the interim deployment phase, set
> message.format.version and down-convert for producer request v2.
> Down-conversion for v2 is no worse than what the broker is doing now.
> I don't think we want a prolonged phase where we down-convert for
> every v1 fetch - in fact I'm less concerned about losing zero-copy for
> those fetch requests than the overhead of decompress/recompress for
> those fetches as that would increase your CPU usage by 4x, 5x or
> whatever the average consumer fan-out is. The
> decompression/recompression will put further memory pressure as well.
>
> It is true that clients send the latest request version that it is
> compiled with and that does not need to change. The broker can
> continue to send back with zero-copy for fetch request version 2 as
> well (even if during the interim phase during which it down-converts
> producer request v2). The consumer iterator (for old consumer) or the
> Fetcher (for new consumer) needs to be able to handle messages that
> are in original as well as new (relative offset) format.
>
> Thanks,
>
> Joel
>
>
> On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin 
> wrote:
> > Hi Joel,
> >
> > That is a valid concern. And that is actually why we had the
> > message.format.version before.
> >
> > My original thinking was:
> > 1. upgrade the broker to support both V1 and V2 for consumer/producer
> > request.
> > 2. configure broker to store V1 on the disk. (message.format.version = 1)
> > 3. upgrade the consumer to support both V1 and V2 for consumer request.
> > 4. Meanwhile some producer might also be upgraded to use producer request
> > V2.
> > 5. At this point, for producer request V2, broker will do down
> conversion.
> > Regardless consumers are upgraded or not, broker will always use
> zero-copy
> > transfer. Because supposedly both old and upgraded consumer should be
> able
> > to understand that.
> > 6. After most of the consumers are upgraded, We set
> message.format.version
> > = 1 and only do down conversion for old consumers.
> >
> > This way we don't need to reject producer request V2. And we always to
> > version conversion for the minority of the consumers. However I have a
> few
> > concerns over this approach, not sure if they actually matters.
> >
> > A. (5) is not true for now. Today the clients only uses the highest
> > version, i.e. a producer/consumer wouldn't parse a lower version of
> > response even the code exist there. I think supposedly, consumer should
> > stick to one version and broker should do the conversion.
> > B. Let's say (A) is not a concern, we make all the clients support all
> the
> > versions it knows. At step(6), there will be a transitional period that
> > user will see both messages with new and old version. For KIP-31 only it
> > might be OK because we are not adding anything into the message. But if
> the
> > message has different fields (e.g. KIP-32), that means people will get
> > those fields from some messages but not from some other messages. Would
> > that be a problem?
> >
> > If (A) and (B) are not a problem. Is the above procedure able to address
> > your concern?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy  wrote:
> >
> >> The upgrade plan works, but the potentially long interim phase of
> >> skipping zero-copy for down-conversion could be problematic especially
> >> for large deployments with large consumer fan-out. It is not only
> >> going to be memory overhead but CPU as well - since you need to
> >> decompress, write absolute offsets, then recompress for every v1
> >> fetch. i.e., it may be safer (but obviously more tedious) to have a
> >> multi-step upgrade process. For e.g.,:
> >>
> >> 1 - Upgrade brokers, but disable the feature. i.e., either reject
> >> producer requests v2 or down-convert to old message format (with
> >> absolute offsets)
> >> 2 - Upgrade clients, but they should only use v1 requests
> >> 3 - Switch (all or most) consumers to use v2 fetch format (which will
> >> use zero-copy).
> >> 4 - Turn on the feature on the brokers to allow producer requests v2
> >> 5 - Switch producers to use v2 produce format
> >>
> >> (You may want a v1 fetch rate metric and decide to proceed to step 4
> >> only when that comes down to a trickle)
> >>
> >> I'm not sure if the prolonged upgrade process is viable in every
> >> scenario. I think it should work at LinkedIn for e.g., but may not for
> >> other environments.
> >>
> >> Joel
> >>
> >>
> >> On Tue, Sep 22, 2015 at 12:55 AM, Jiangjie Qin

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-30 Thread Magnus Edenhill
The BrokerMetadataRequest API could be used to propagate supported
message.formats from broker to client.
This is currently discussed in the KIP-35 thread.

/Magnus



2015-09-30 1:50 GMT+02:00 Jiangjie Qin :

> Hi Joel and other folks.
>
> I updated the KIP page with the two phase roll out, which avoids the
> conversion for majority of users.
>
> To do that we need to add a message.format.version configuration to broker.
> Other than that there is no interface change from the previous proposal.
> Please let me know if you have concern about the updated proposal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy  wrote:
>
> > Hey Becket,
> >
> > I do think we need the interim deployment phase, set
> > message.format.version and down-convert for producer request v2.
> > Down-conversion for v2 is no worse than what the broker is doing now.
> > I don't think we want a prolonged phase where we down-convert for
> > every v1 fetch - in fact I'm less concerned about losing zero-copy for
> > those fetch requests than the overhead of decompress/recompress for
> > those fetches as that would increase your CPU usage by 4x, 5x or
> > whatever the average consumer fan-out is. The
> > decompression/recompression will put further memory pressure as well.
> >
> > It is true that clients send the latest request version that it is
> > compiled with and that does not need to change. The broker can
> > continue to send back with zero-copy for fetch request version 2 as
> > well (even if during the interim phase during which it down-converts
> > producer request v2). The consumer iterator (for old consumer) or the
> > Fetcher (for new consumer) needs to be able to handle messages that
> > are in original as well as new (relative offset) format.
> >
> > Thanks,
> >
> > Joel
> >
> >
> > On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin  >
> > wrote:
> > > Hi Joel,
> > >
> > > That is a valid concern. And that is actually why we had the
> > > message.format.version before.
> > >
> > > My original thinking was:
> > > 1. upgrade the broker to support both V1 and V2 for consumer/producer
> > > request.
> > > 2. configure broker to store V1 on the disk. (message.format.version =
> 1)
> > > 3. upgrade the consumer to support both V1 and V2 for consumer request.
> > > 4. Meanwhile some producer might also be upgraded to use producer
> request
> > > V2.
> > > 5. At this point, for producer request V2, broker will do down
> > conversion.
> > > Regardless consumers are upgraded or not, broker will always use
> > zero-copy
> > > transfer. Because supposedly both old and upgraded consumer should be
> > able
> > > to understand that.
> > > 6. After most of the consumers are upgraded, We set
> > message.format.version
> > > = 1 and only do down conversion for old consumers.
> > >
> > > This way we don't need to reject producer request V2. And we always to
> > > version conversion for the minority of the consumers. However I have a
> > few
> > > concerns over this approach, not sure if they actually matters.
> > >
> > > A. (5) is not true for now. Today the clients only uses the highest
> > > version, i.e. a producer/consumer wouldn't parse a lower version of
> > > response even the code exist there. I think supposedly, consumer should
> > > stick to one version and broker should do the conversion.
> > > B. Let's say (A) is not a concern, we make all the clients support all
> > the
> > > versions it knows. At step(6), there will be a transitional period that
> > > user will see both messages with new and old version. For KIP-31 only
> it
> > > might be OK because we are not adding anything into the message. But if
> > the
> > > message has different fields (e.g. KIP-32), that means people will get
> > > those fields from some messages but not from some other messages. Would
> > > that be a problem?
> > >
> > > If (A) and (B) are not a problem. Is the above procedure able to
> address
> > > your concern?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy 
> wrote:
> > >
> > >> The upgrade plan works, but the potentially long interim phase of
> > >> skipping zero-copy for down-conversion could be problematic especially
> > >> for large deployments with large consumer fan-out. It is not only
> > >> going to be memory overhead but CPU as well - since you need to
> > >> decompress, write absolute offsets, then recompress for every v1
> > >> fetch. i.e., it may be safer (but obviously more tedious) to have a
> > >> multi-step upgrade process. For e.g.,:
> > >>
> > >> 1 - Upgrade brokers, but disable the feature. i.e., either reject
> > >> producer requests v2 or down-convert to old message format (with
> > >> absolute offsets)
> > >> 2 - Upgrade clients, but they should only use v1 requests
> > >> 3 - Switch (all or most) consumers to use v2 fetch format (which will
> > >> use zero-copy).
> > >> 4 - Turn on the feature on the brokers to allow producer request

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-30 Thread Joel Koshy
The Phase 2 2.* sub-steps don't seem to be right. Can you look over
that carefully? Also, "definitive" - you mean "absolute" i.e., not
relative offsets right?

One more thing that may be worth mentioning is that it is technically
possible to canary the new version format on at most one broker (or
multiple if it hosts mutually disjoint partitions). Basically turn on
the new message format on one broker, leave it on for an extended
period - if we hit some unanticipated bug and something goes terribly
wrong with the feature then just kill that broker, switch it to the v0
on-disk format and reseed it from the leaders. Most people may not
want to have such a long deployment plan but at least it is an option
for those who want to tread very carefully given that it is backwards
incompatible.

Joel

On Tue, Sep 29, 2015 at 4:50 PM, Jiangjie Qin  wrote:
> Hi Joel and other folks.
>
> I updated the KIP page with the two phase roll out, which avoids the
> conversion for majority of users.
>
> To do that we need to add a message.format.version configuration to broker.
> Other than that there is no interface change from the previous proposal.
> Please let me know if you have concern about the updated proposal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy  wrote:
>
>> Hey Becket,
>>
>> I do think we need the interim deployment phase, set
>> message.format.version and down-convert for producer request v2.
>> Down-conversion for v2 is no worse than what the broker is doing now.
>> I don't think we want a prolonged phase where we down-convert for
>> every v1 fetch - in fact I'm less concerned about losing zero-copy for
>> those fetch requests than the overhead of decompress/recompress for
>> those fetches as that would increase your CPU usage by 4x, 5x or
>> whatever the average consumer fan-out is. The
>> decompression/recompression will put further memory pressure as well.
>>
>> It is true that clients send the latest request version that it is
>> compiled with and that does not need to change. The broker can
>> continue to send back with zero-copy for fetch request version 2 as
>> well (even if during the interim phase during which it down-converts
>> producer request v2). The consumer iterator (for old consumer) or the
>> Fetcher (for new consumer) needs to be able to handle messages that
>> are in original as well as new (relative offset) format.
>>
>> Thanks,
>>
>> Joel
>>
>>
>> On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin 
>> wrote:
>> > Hi Joel,
>> >
>> > That is a valid concern. And that is actually why we had the
>> > message.format.version before.
>> >
>> > My original thinking was:
>> > 1. upgrade the broker to support both V1 and V2 for consumer/producer
>> > request.
>> > 2. configure broker to store V1 on the disk. (message.format.version = 1)
>> > 3. upgrade the consumer to support both V1 and V2 for consumer request.
>> > 4. Meanwhile some producer might also be upgraded to use producer request
>> > V2.
>> > 5. At this point, for producer request V2, broker will do down
>> conversion.
>> > Regardless consumers are upgraded or not, broker will always use
>> zero-copy
>> > transfer. Because supposedly both old and upgraded consumer should be
>> able
>> > to understand that.
>> > 6. After most of the consumers are upgraded, We set
>> message.format.version
>> > = 1 and only do down conversion for old consumers.
>> >
>> > This way we don't need to reject producer request V2. And we always to
>> > version conversion for the minority of the consumers. However I have a
>> few
>> > concerns over this approach, not sure if they actually matters.
>> >
>> > A. (5) is not true for now. Today the clients only uses the highest
>> > version, i.e. a producer/consumer wouldn't parse a lower version of
>> > response even the code exist there. I think supposedly, consumer should
>> > stick to one version and broker should do the conversion.
>> > B. Let's say (A) is not a concern, we make all the clients support all
>> the
>> > versions it knows. At step(6), there will be a transitional period that
>> > user will see both messages with new and old version. For KIP-31 only it
>> > might be OK because we are not adding anything into the message. But if
>> the
>> > message has different fields (e.g. KIP-32), that means people will get
>> > those fields from some messages but not from some other messages. Would
>> > that be a problem?
>> >
>> > If (A) and (B) are not a problem. Is the above procedure able to address
>> > your concern?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Thu, Sep 24, 2015 at 6:32 PM, Joel Koshy  wrote:
>> >
>> >> The upgrade plan works, but the potentially long interim phase of
>> >> skipping zero-copy for down-conversion could be problematic especially
>> >> for large deployments with large consumer fan-out. It is not only
>> >> going to be memory overhead but CPU as well - since you need to
>> >> decompress, write absolute offsets, then recomp

Re: [DISCUSS] KIP-31 - Message format change proposal

2015-10-01 Thread Jiangjie Qin
Joel,

Thanks for the comments. I updated the KIP page and added the canary
procedure.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 30, 2015 at 6:26 PM, Joel Koshy  wrote:

> The Phase 2 2.* sub-steps don't seem to be right. Can you look over
> that carefully? Also, "definitive" - you mean "absolute" i.e., not
> relative offsets right?
>
> One more thing that may be worth mentioning is that it is technically
> possible to canary the new version format on at most one broker (or
> multiple if it hosts mutually disjoint partitions). Basically turn on
> the new message format on one broker, leave it on for an extended
> period - if we hit some unanticipated bug and something goes terribly
> wrong with the feature then just kill that broker, switch it to the v0
> on-disk format and reseed it from the leaders. Most people may not
> want to have such a long deployment plan but at least it is an option
> for those who want to tread very carefully given that it is backwards
> incompatible.
>
> Joel
>
> On Tue, Sep 29, 2015 at 4:50 PM, Jiangjie Qin 
> wrote:
> > Hi Joel and other folks.
> >
> > I updated the KIP page with the two phase roll out, which avoids the
> > conversion for majority of users.
> >
> > To do that we need to add a message.format.version configuration to
> broker.
> > Other than that there is no interface change from the previous proposal.
> > Please let me know if you have concern about the updated proposal.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Sep 25, 2015 at 11:26 AM, Joel Koshy 
> wrote:
> >
> >> Hey Becket,
> >>
> >> I do think we need the interim deployment phase, set
> >> message.format.version and down-convert for producer request v2.
> >> Down-conversion for v2 is no worse than what the broker is doing now.
> >> I don't think we want a prolonged phase where we down-convert for
> >> every v1 fetch - in fact I'm less concerned about losing zero-copy for
> >> those fetch requests than the overhead of decompress/recompress for
> >> those fetches as that would increase your CPU usage by 4x, 5x or
> >> whatever the average consumer fan-out is. The
> >> decompression/recompression will put further memory pressure as well.
> >>
> >> It is true that clients send the latest request version that it is
> >> compiled with and that does not need to change. The broker can
> >> continue to send back with zero-copy for fetch request version 2 as
> >> well (even if during the interim phase during which it down-converts
> >> producer request v2). The consumer iterator (for old consumer) or the
> >> Fetcher (for new consumer) needs to be able to handle messages that
> >> are in original as well as new (relative offset) format.
> >>
> >> Thanks,
> >>
> >> Joel
> >>
> >>
> >> On Thu, Sep 24, 2015 at 7:56 PM, Jiangjie Qin  >
> >> wrote:
> >> > Hi Joel,
> >> >
> >> > That is a valid concern. And that is actually why we had the
> >> > message.format.version before.
> >> >
> >> > My original thinking was:
> >> > 1. upgrade the broker to support both V1 and V2 for consumer/producer
> >> > request.
> >> > 2. configure broker to store V1 on the disk. (message.format.version
> = 1)
> >> > 3. upgrade the consumer to support both V1 and V2 for consumer
> request.
> >> > 4. Meanwhile some producer might also be upgraded to use producer
> request
> >> > V2.
> >> > 5. At this point, for producer request V2, broker will do down
> >> conversion.
> >> > Regardless consumers are upgraded or not, broker will always use
> >> zero-copy
> >> > transfer. Because supposedly both old and upgraded consumer should be
> >> able
> >> > to understand that.
> >> > 6. After most of the consumers are upgraded, We set
> >> message.format.version
> >> > = 1 and only do down conversion for old consumers.
> >> >
> >> > This way we don't need to reject producer request V2. And we always to
> >> > version conversion for the minority of the consumers. However I have a
> >> few
> >> > concerns over this approach, not sure if they actually matters.
> >> >
> >> > A. (5) is not true for now. Today the clients only uses the highest
> >> > version, i.e. a producer/consumer wouldn't parse a lower version of
> >> > response even the code exist there. I think supposedly, consumer
> should
> >> > stick to one version and broker should do the conversion.
> >> > B. Let's say (A) is not a concern, we make all the clients support all
> >> the
> >> > versions it knows. At step(6), there will be a transitional period
> that
> >> > user will see both messages with new and old version. For KIP-31 only
> it
> >> > might be OK because we are not adding anything into the message. But
> if
> >> the
> >> > message has different fields (e.g. KIP-32), that means people will get
> >> > those fields from some messages but not from some other messages.
> Would
> >> > that be a problem?
> >> >
> >> > If (A) and (B) are not a problem. Is the above procedure able to
> address
> >> > your concern?
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin