Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2016-01-27 Thread Becket Qin
Hi Guozhang,

That makes sense. I will update the KIP wiki and bump up the voting thread
to let people know about this change.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 26, 2016 at 10:55 PM, Guozhang Wang  wrote:

> One motivation of my proposal is actually to avoid any clients trying to
> read the timestamp type from the topic metadata response and behave
> differently since:
>
> 1) topic metadata response is not always in-sync with the source-of-truth
> (ZK), hence when the clients realized that the config has changed it may
> already be too late (i.e. for consumer the records with the wrong timestamp
> could already be returned to user).
>
> 2) the client logic could be a bit simpler, and this will benefit non-Java
> development a lot. Also we can avoid adding this into the topic metadata
> response.
>
> Guozhang
>
> On Tue, Jan 26, 2016 at 3:20 PM, Becket Qin  wrote:
>
> > My hesitation for the changed protocol is that I think If we will have
> > topic configuration returned in the topic metadata, the current protocol
> > makes more sense. Because the timestamp type is a topic level setting so
> we
> > don't need to put it into each message. That is assuming the timestamp
> type
> > change on a topic rarely happens and if it is ever needed, the existing
> > data should be wiped out.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin 
> wrote:
> >
> > > Bump up this thread per discussion on the KIP hangout.
> > >
> > > During the implementation of the KIP, Guozhang raised another proposal
> on
> > > how to indicate the message timestamp type used by messages. So we want
> > to
> > > see people's opinion on this proposal.
> > >
> > > The difference between current and the new proposal only differs on
> > > messages that are a) compressed, and b) using LogAppendTime
> > >
> > > For compressed messages using LogAppendTime, the timestamps in the
> > current
> > > proposal is as below:
> > > 1. When a producer produces the messages, it tries to set timestamp to
> -1
> > > for inner messages if it knows LogAppendTime is used.
> > > 2. When a broker receives the messages, it will overwrite the timestamp
> > of
> > > inner message to -1 if needed and write server time to the wrapper
> > message.
> > > Broker will do re-compression if inner message timestamp is
> overwritten.
> > > 3. When a consumer receives the messages, it will see the inner message
> > > timestamp is -1 so the wrapper message timestamp is used.
> > >
> > > Implementation wise, this proposal requires the producer to set
> timestamp
> > > for inner messages correctly to avoid broker side re-compression. To do
> > > that, the short term solution is to let producer infer the timestamp
> type
> > > returned by broker in ProduceResponse and set correct timestamp
> > afterwards.
> > > This means the first few batches will still need re-compression on the
> > > broker. The long term solution is to have producer get topic
> > configuration
> > > during metadata update.
> > >
> > >
> > > The proposed modification is:
> > > 1. When a producer produces the messages, it always using create time.
> > > 2. When a broker receives the messages, it ignores the inner messages
> > > timestamp, but simply set a wrapper message timestamp type attribute
> bit
> > to
> > > 1 and set the timestamp of the wrapper message to server time. (The
> > broker
> > > will also set the timesatmp type attribute bit accordingly for
> > > non-compressed messages using LogAppendTime).
> > > 3. When a consumer receives the messages, it checks timestamp type
> > > attribute bit of wrapper message. If it is set to 1, the inner
> message's
> > > timestamp will be ignored and the wrapper message's timestamp will be
> > used.
> > >
> > > This approach uses an extra attribute bit. The good thing of the
> modified
> > > protocol is consumers will be able to know the timestamp type. And
> > > re-compression on broker side is completely avoided no matter what
> value
> > is
> > > sent by the producer. In this approach the inner messages will have
> wrong
> > > timestamps.
> > >
> > > We want to see if people have concerns over the modified approach.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin 
> > wrote:
> > >
> > >> Jun,
> > >>
> > >> 1. I agree it would be nice to have the timestamps used in a unified
> > way.
> > >> My concern is that if we let server change timestamp of the inner
> > message
> > >> for LogAppendTime, that will enforce the user who are using
> > LogAppendTime
> > >> to always pay the recompression penalty. So using LogAppendTime makes
> > >> KIP-31 in vain.
> > >>
> > >> 4. If there are no entries in the log segment, we can read from the
> time
> > >> index before the previous log segment. If there is no previous entry
> > >> avaliable after we search until the earliest log segment, that means
> all
> > >> the previous log segment w

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2016-01-26 Thread Guozhang Wang
One motivation of my proposal is actually to avoid any clients trying to
read the timestamp type from the topic metadata response and behave
differently since:

1) topic metadata response is not always in-sync with the source-of-truth
(ZK), hence when the clients realized that the config has changed it may
already be too late (i.e. for consumer the records with the wrong timestamp
could already be returned to user).

2) the client logic could be a bit simpler, and this will benefit non-Java
development a lot. Also we can avoid adding this into the topic metadata
response.

Guozhang

On Tue, Jan 26, 2016 at 3:20 PM, Becket Qin  wrote:

> My hesitation for the changed protocol is that I think If we will have
> topic configuration returned in the topic metadata, the current protocol
> makes more sense. Because the timestamp type is a topic level setting so we
> don't need to put it into each message. That is assuming the timestamp type
> change on a topic rarely happens and if it is ever needed, the existing
> data should be wiped out.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin  wrote:
>
> > Bump up this thread per discussion on the KIP hangout.
> >
> > During the implementation of the KIP, Guozhang raised another proposal on
> > how to indicate the message timestamp type used by messages. So we want
> to
> > see people's opinion on this proposal.
> >
> > The difference between current and the new proposal only differs on
> > messages that are a) compressed, and b) using LogAppendTime
> >
> > For compressed messages using LogAppendTime, the timestamps in the
> current
> > proposal is as below:
> > 1. When a producer produces the messages, it tries to set timestamp to -1
> > for inner messages if it knows LogAppendTime is used.
> > 2. When a broker receives the messages, it will overwrite the timestamp
> of
> > inner message to -1 if needed and write server time to the wrapper
> message.
> > Broker will do re-compression if inner message timestamp is overwritten.
> > 3. When a consumer receives the messages, it will see the inner message
> > timestamp is -1 so the wrapper message timestamp is used.
> >
> > Implementation wise, this proposal requires the producer to set timestamp
> > for inner messages correctly to avoid broker side re-compression. To do
> > that, the short term solution is to let producer infer the timestamp type
> > returned by broker in ProduceResponse and set correct timestamp
> afterwards.
> > This means the first few batches will still need re-compression on the
> > broker. The long term solution is to have producer get topic
> configuration
> > during metadata update.
> >
> >
> > The proposed modification is:
> > 1. When a producer produces the messages, it always using create time.
> > 2. When a broker receives the messages, it ignores the inner messages
> > timestamp, but simply set a wrapper message timestamp type attribute bit
> to
> > 1 and set the timestamp of the wrapper message to server time. (The
> broker
> > will also set the timesatmp type attribute bit accordingly for
> > non-compressed messages using LogAppendTime).
> > 3. When a consumer receives the messages, it checks timestamp type
> > attribute bit of wrapper message. If it is set to 1, the inner message's
> > timestamp will be ignored and the wrapper message's timestamp will be
> used.
> >
> > This approach uses an extra attribute bit. The good thing of the modified
> > protocol is consumers will be able to know the timestamp type. And
> > re-compression on broker side is completely avoided no matter what value
> is
> > sent by the producer. In this approach the inner messages will have wrong
> > timestamps.
> >
> > We want to see if people have concerns over the modified approach.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin 
> wrote:
> >
> >> Jun,
> >>
> >> 1. I agree it would be nice to have the timestamps used in a unified
> way.
> >> My concern is that if we let server change timestamp of the inner
> message
> >> for LogAppendTime, that will enforce the user who are using
> LogAppendTime
> >> to always pay the recompression penalty. So using LogAppendTime makes
> >> KIP-31 in vain.
> >>
> >> 4. If there are no entries in the log segment, we can read from the time
> >> index before the previous log segment. If there is no previous entry
> >> avaliable after we search until the earliest log segment, that means all
> >> the previous log segment with a valid time index entry has been
> deleted. In
> >> that case supposedly there should be only one log segment left - the
> active
> >> log segment, we can simply set the latest timestamp to 0.
> >>
> >> Guozhang,
> >>
> >> Sorry for the confusion. by "the timestamp of the latest message" I
> >> actually meant "the timestamp of the message with largest timestamp".
> So in
> >> your example the "latest message" is 5.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2016-01-26 Thread Becket Qin
My hesitation for the changed protocol is that I think If we will have
topic configuration returned in the topic metadata, the current protocol
makes more sense. Because the timestamp type is a topic level setting so we
don't need to put it into each message. That is assuming the timestamp type
change on a topic rarely happens and if it is ever needed, the existing
data should be wiped out.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin  wrote:

> Bump up this thread per discussion on the KIP hangout.
>
> During the implementation of the KIP, Guozhang raised another proposal on
> how to indicate the message timestamp type used by messages. So we want to
> see people's opinion on this proposal.
>
> The difference between current and the new proposal only differs on
> messages that are a) compressed, and b) using LogAppendTime
>
> For compressed messages using LogAppendTime, the timestamps in the current
> proposal is as below:
> 1. When a producer produces the messages, it tries to set timestamp to -1
> for inner messages if it knows LogAppendTime is used.
> 2. When a broker receives the messages, it will overwrite the timestamp of
> inner message to -1 if needed and write server time to the wrapper message.
> Broker will do re-compression if inner message timestamp is overwritten.
> 3. When a consumer receives the messages, it will see the inner message
> timestamp is -1 so the wrapper message timestamp is used.
>
> Implementation wise, this proposal requires the producer to set timestamp
> for inner messages correctly to avoid broker side re-compression. To do
> that, the short term solution is to let producer infer the timestamp type
> returned by broker in ProduceResponse and set correct timestamp afterwards.
> This means the first few batches will still need re-compression on the
> broker. The long term solution is to have producer get topic configuration
> during metadata update.
>
>
> The proposed modification is:
> 1. When a producer produces the messages, it always using create time.
> 2. When a broker receives the messages, it ignores the inner messages
> timestamp, but simply set a wrapper message timestamp type attribute bit to
> 1 and set the timestamp of the wrapper message to server time. (The broker
> will also set the timesatmp type attribute bit accordingly for
> non-compressed messages using LogAppendTime).
> 3. When a consumer receives the messages, it checks timestamp type
> attribute bit of wrapper message. If it is set to 1, the inner message's
> timestamp will be ignored and the wrapper message's timestamp will be used.
>
> This approach uses an extra attribute bit. The good thing of the modified
> protocol is consumers will be able to know the timestamp type. And
> re-compression on broker side is completely avoided no matter what value is
> sent by the producer. In this approach the inner messages will have wrong
> timestamps.
>
> We want to see if people have concerns over the modified approach.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin  wrote:
>
>> Jun,
>>
>> 1. I agree it would be nice to have the timestamps used in a unified way.
>> My concern is that if we let server change timestamp of the inner message
>> for LogAppendTime, that will enforce the user who are using LogAppendTime
>> to always pay the recompression penalty. So using LogAppendTime makes
>> KIP-31 in vain.
>>
>> 4. If there are no entries in the log segment, we can read from the time
>> index before the previous log segment. If there is no previous entry
>> avaliable after we search until the earliest log segment, that means all
>> the previous log segment with a valid time index entry has been deleted. In
>> that case supposedly there should be only one log segment left - the active
>> log segment, we can simply set the latest timestamp to 0.
>>
>> Guozhang,
>>
>> Sorry for the confusion. by "the timestamp of the latest message" I
>> actually meant "the timestamp of the message with largest timestamp". So in
>> your example the "latest message" is 5.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang 
>> wrote:
>>
>>> Jun, Jiangjie,
>>>
>>> I am confused about 3) here, if we use "the timestamp of the latest
>>> message"
>>> then doesn't this mean we will roll the log whenever a message delayed by
>>> rolling time is received as well? Just to clarify, my understanding of
>>> "the
>>> timestamp of the latest message", for example in the following log, is 1,
>>> not 5:
>>>
>>> 2, 3, 4, 5, 1
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao  wrote:
>>>
>>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
>>> whether
>>> > the messages are compressed or not. When
>>> > message.timestamp.type=LogAppendTime,
>>> > we will need to set timestamp in each message if messages are not
>>> > compressed, so that the follower can get the same timestamp. S

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2016-01-26 Thread Jay Kreps
I'm in favor of Guozhang's proposal. I think that logic is a bit hacky, but
I agree that this is better than the alternative, and the hackiness only
effects people using log append time which I think will be pretty uncommon.
I think setting that bit will have the additional added value that
consumers can know the meaning of the timestamp.

-Jay

On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin  wrote:

> Bump up this thread per discussion on the KIP hangout.
>
> During the implementation of the KIP, Guozhang raised another proposal on
> how to indicate the message timestamp type used by messages. So we want to
> see people's opinion on this proposal.
>
> The difference between current and the new proposal only differs on
> messages that are a) compressed, and b) using LogAppendTime
>
> For compressed messages using LogAppendTime, the timestamps in the current
> proposal is as below:
> 1. When a producer produces the messages, it tries to set timestamp to -1
> for inner messages if it knows LogAppendTime is used.
> 2. When a broker receives the messages, it will overwrite the timestamp of
> inner message to -1 if needed and write server time to the wrapper message.
> Broker will do re-compression if inner message timestamp is overwritten.
> 3. When a consumer receives the messages, it will see the inner message
> timestamp is -1 so the wrapper message timestamp is used.
>
> Implementation wise, this proposal requires the producer to set timestamp
> for inner messages correctly to avoid broker side re-compression. To do
> that, the short term solution is to let producer infer the timestamp type
> returned by broker in ProduceResponse and set correct timestamp afterwards.
> This means the first few batches will still need re-compression on the
> broker. The long term solution is to have producer get topic configuration
> during metadata update.
>
>
> The proposed modification is:
> 1. When a producer produces the messages, it always using create time.
> 2. When a broker receives the messages, it ignores the inner messages
> timestamp, but simply set a wrapper message timestamp type attribute bit to
> 1 and set the timestamp of the wrapper message to server time. (The broker
> will also set the timesatmp type attribute bit accordingly for
> non-compressed messages using LogAppendTime).
> 3. When a consumer receives the messages, it checks timestamp type
> attribute bit of wrapper message. If it is set to 1, the inner message's
> timestamp will be ignored and the wrapper message's timestamp will be used.
>
> This approach uses an extra attribute bit. The good thing of the modified
> protocol is consumers will be able to know the timestamp type. And
> re-compression on broker side is completely avoided no matter what value is
> sent by the producer. In this approach the inner messages will have wrong
> timestamps.
>
> We want to see if people have concerns over the modified approach.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin  wrote:
>
> > Jun,
> >
> > 1. I agree it would be nice to have the timestamps used in a unified way.
> > My concern is that if we let server change timestamp of the inner message
> > for LogAppendTime, that will enforce the user who are using LogAppendTime
> > to always pay the recompression penalty. So using LogAppendTime makes
> > KIP-31 in vain.
> >
> > 4. If there are no entries in the log segment, we can read from the time
> > index before the previous log segment. If there is no previous entry
> > avaliable after we search until the earliest log segment, that means all
> > the previous log segment with a valid time index entry has been deleted.
> In
> > that case supposedly there should be only one log segment left - the
> active
> > log segment, we can simply set the latest timestamp to 0.
> >
> > Guozhang,
> >
> > Sorry for the confusion. by "the timestamp of the latest message" I
> > actually meant "the timestamp of the message with largest timestamp". So
> in
> > your example the "latest message" is 5.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang 
> > wrote:
> >
> >> Jun, Jiangjie,
> >>
> >> I am confused about 3) here, if we use "the timestamp of the latest
> >> message"
> >> then doesn't this mean we will roll the log whenever a message delayed
> by
> >> rolling time is received as well? Just to clarify, my understanding of
> >> "the
> >> timestamp of the latest message", for example in the following log, is
> 1,
> >> not 5:
> >>
> >> 2, 3, 4, 5, 1
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao  wrote:
> >>
> >> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
> >> whether
> >> > the messages are compressed or not. When
> >> > message.timestamp.type=LogAppendTime,
> >> > we will need to set timestamp in each message if messages are not
> >> > compressed, so that the follower can get the same timestamp. So, it
> >> seems

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2016-01-26 Thread Becket Qin
Bump up this thread per discussion on the KIP hangout.

During the implementation of the KIP, Guozhang raised another proposal on
how to indicate the message timestamp type used by messages. So we want to
see people's opinion on this proposal.

The difference between current and the new proposal only differs on
messages that are a) compressed, and b) using LogAppendTime

For compressed messages using LogAppendTime, the timestamps in the current
proposal is as below:
1. When a producer produces the messages, it tries to set timestamp to -1
for inner messages if it knows LogAppendTime is used.
2. When a broker receives the messages, it will overwrite the timestamp of
inner message to -1 if needed and write server time to the wrapper message.
Broker will do re-compression if inner message timestamp is overwritten.
3. When a consumer receives the messages, it will see the inner message
timestamp is -1 so the wrapper message timestamp is used.

Implementation wise, this proposal requires the producer to set timestamp
for inner messages correctly to avoid broker side re-compression. To do
that, the short term solution is to let producer infer the timestamp type
returned by broker in ProduceResponse and set correct timestamp afterwards.
This means the first few batches will still need re-compression on the
broker. The long term solution is to have producer get topic configuration
during metadata update.


The proposed modification is:
1. When a producer produces the messages, it always using create time.
2. When a broker receives the messages, it ignores the inner messages
timestamp, but simply set a wrapper message timestamp type attribute bit to
1 and set the timestamp of the wrapper message to server time. (The broker
will also set the timesatmp type attribute bit accordingly for
non-compressed messages using LogAppendTime).
3. When a consumer receives the messages, it checks timestamp type
attribute bit of wrapper message. If it is set to 1, the inner message's
timestamp will be ignored and the wrapper message's timestamp will be used.

This approach uses an extra attribute bit. The good thing of the modified
protocol is consumers will be able to know the timestamp type. And
re-compression on broker side is completely avoided no matter what value is
sent by the producer. In this approach the inner messages will have wrong
timestamps.

We want to see if people have concerns over the modified approach.

Thanks,

Jiangjie (Becket) Qin





On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin  wrote:

> Jun,
>
> 1. I agree it would be nice to have the timestamps used in a unified way.
> My concern is that if we let server change timestamp of the inner message
> for LogAppendTime, that will enforce the user who are using LogAppendTime
> to always pay the recompression penalty. So using LogAppendTime makes
> KIP-31 in vain.
>
> 4. If there are no entries in the log segment, we can read from the time
> index before the previous log segment. If there is no previous entry
> avaliable after we search until the earliest log segment, that means all
> the previous log segment with a valid time index entry has been deleted. In
> that case supposedly there should be only one log segment left - the active
> log segment, we can simply set the latest timestamp to 0.
>
> Guozhang,
>
> Sorry for the confusion. by "the timestamp of the latest message" I
> actually meant "the timestamp of the message with largest timestamp". So in
> your example the "latest message" is 5.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang 
> wrote:
>
>> Jun, Jiangjie,
>>
>> I am confused about 3) here, if we use "the timestamp of the latest
>> message"
>> then doesn't this mean we will roll the log whenever a message delayed by
>> rolling time is received as well? Just to clarify, my understanding of
>> "the
>> timestamp of the latest message", for example in the following log, is 1,
>> not 5:
>>
>> 2, 3, 4, 5, 1
>>
>> Guozhang
>>
>>
>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao  wrote:
>>
>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
>> whether
>> > the messages are compressed or not. When
>> > message.timestamp.type=LogAppendTime,
>> > we will need to set timestamp in each message if messages are not
>> > compressed, so that the follower can get the same timestamp. So, it
>> seems
>> > that we should do the same thing for inner messages when messages are
>> > compressed.
>> >
>> > 4. I thought on startup, we restore the timestamp of the latest message
>> by
>> > reading from the time index of the last log segment. So, what happens if
>> > there are no index entries?
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin 
>> wrote:
>> >
>> > > Thanks for the explanation, Jun.
>> > >
>> > > 1. That makes sense. So maybe we can do the following:
>> > > (a) Set the timestamp in the compressed message to latest timestamp of
>> > all
>> > > its inner messages. 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-15 Thread Becket Qin
Jun,

1. I agree it would be nice to have the timestamps used in a unified way.
My concern is that if we let server change timestamp of the inner message
for LogAppendTime, that will enforce the user who are using LogAppendTime
to always pay the recompression penalty. So using LogAppendTime makes
KIP-31 in vain.

4. If there are no entries in the log segment, we can read from the time
index before the previous log segment. If there is no previous entry
avaliable after we search until the earliest log segment, that means all
the previous log segment with a valid time index entry has been deleted. In
that case supposedly there should be only one log segment left - the active
log segment, we can simply set the latest timestamp to 0.

Guozhang,

Sorry for the confusion. by "the timestamp of the latest message" I
actually meant "the timestamp of the message with largest timestamp". So in
your example the "latest message" is 5.

Thanks,

Jiangjie (Becket) Qin



On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang  wrote:

> Jun, Jiangjie,
>
> I am confused about 3) here, if we use "the timestamp of the latest
> message"
> then doesn't this mean we will roll the log whenever a message delayed by
> rolling time is received as well? Just to clarify, my understanding of "the
> timestamp of the latest message", for example in the following log, is 1,
> not 5:
>
> 2, 3, 4, 5, 1
>
> Guozhang
>
>
> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao  wrote:
>
> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
> whether
> > the messages are compressed or not. When
> > message.timestamp.type=LogAppendTime,
> > we will need to set timestamp in each message if messages are not
> > compressed, so that the follower can get the same timestamp. So, it seems
> > that we should do the same thing for inner messages when messages are
> > compressed.
> >
> > 4. I thought on startup, we restore the timestamp of the latest message
> by
> > reading from the time index of the last log segment. So, what happens if
> > there are no index entries?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin 
> wrote:
> >
> > > Thanks for the explanation, Jun.
> > >
> > > 1. That makes sense. So maybe we can do the following:
> > > (a) Set the timestamp in the compressed message to latest timestamp of
> > all
> > > its inner messages. This works for both LogAppendTime and CreateTime.
> > > (b) If message.timestamp.type=LogAppendTime, the broker will overwrite
> > all
> > > the inner message timestamp to -1 if they are not set to -1. This is
> > mainly
> > > for topics that are using LogAppendTime. Hopefully the producer will
> set
> > > the timestamp to -1 in the ProducerRecord to avoid server side
> > > recompression.
> > >
> > > 3. I see. That works. So the semantic of log rolling becomes "roll out
> > the
> > > log segment if it has been inactive since the latest message has
> > arrived."
> > >
> > > 4. Yes. If the largest timestamp is in previous log segment. The time
> > index
> > > for the current log segment does not have a valid offset in current log
> > > segment to point to. Maybe in that case we should build an empty log
> > index.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao  wrote:
> > >
> > > > 1. I was thinking more about saving the decompression overhead in the
> > > > follower. Currently, the follower doesn't decompress the messages. To
> > > keep
> > > > it that way, the outer message needs to include the timestamp of the
> > > latest
> > > > inner message to build the time index in the follower. The simplest
> > thing
> > > > to do is to change the timestamp in the inner messages if necessary,
> in
> > > > which case there will be the recompression overhead. However, in the
> > case
> > > > when the timestamp of the inner messages don't have to be changed
> > > > (hopefully more common), there won't be the recompression overhead.
> In
> > > > either case, we always set the timestamp in the outer message to be
> the
> > > > timestamp of the latest inner message, in the leader.
> > > >
> > > > 3. Basically, in each log segment, we keep track of the timestamp of
> > the
> > > > latest message. If current time - timestamp of latest message > log
> > > rolling
> > > > interval, we roll a new log segment. So, if messages with later
> > > timestamps
> > > > keep getting added, we only roll new log segments based on size. On
> the
> > > > other hand, if no new messages are added to a log, we can force a log
> > > roll
> > > > based on time, which addresses the issue in (b).
> > > >
> > > > 4. Hmm, the index is per segment and should only point to positions
> in
> > > the
> > > > corresponding .log file, not previous ones, right?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin 
> > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks a lot for the comme

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-15 Thread Guozhang Wang
Jun, Jiangjie,

I am confused about 3) here, if we use "the timestamp of the latest message"
then doesn't this mean we will roll the log whenever a message delayed by
rolling time is received as well? Just to clarify, my understanding of "the
timestamp of the latest message", for example in the following log, is 1,
not 5:

2, 3, 4, 5, 1

Guozhang


On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao  wrote:

> 1. Hmm, it's more intuitive if the consumer sees the same timestamp whether
> the messages are compressed or not. When
> message.timestamp.type=LogAppendTime,
> we will need to set timestamp in each message if messages are not
> compressed, so that the follower can get the same timestamp. So, it seems
> that we should do the same thing for inner messages when messages are
> compressed.
>
> 4. I thought on startup, we restore the timestamp of the latest message by
> reading from the time index of the last log segment. So, what happens if
> there are no index entries?
>
> Thanks,
>
> Jun
>
> On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin  wrote:
>
> > Thanks for the explanation, Jun.
> >
> > 1. That makes sense. So maybe we can do the following:
> > (a) Set the timestamp in the compressed message to latest timestamp of
> all
> > its inner messages. This works for both LogAppendTime and CreateTime.
> > (b) If message.timestamp.type=LogAppendTime, the broker will overwrite
> all
> > the inner message timestamp to -1 if they are not set to -1. This is
> mainly
> > for topics that are using LogAppendTime. Hopefully the producer will set
> > the timestamp to -1 in the ProducerRecord to avoid server side
> > recompression.
> >
> > 3. I see. That works. So the semantic of log rolling becomes "roll out
> the
> > log segment if it has been inactive since the latest message has
> arrived."
> >
> > 4. Yes. If the largest timestamp is in previous log segment. The time
> index
> > for the current log segment does not have a valid offset in current log
> > segment to point to. Maybe in that case we should build an empty log
> index.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao  wrote:
> >
> > > 1. I was thinking more about saving the decompression overhead in the
> > > follower. Currently, the follower doesn't decompress the messages. To
> > keep
> > > it that way, the outer message needs to include the timestamp of the
> > latest
> > > inner message to build the time index in the follower. The simplest
> thing
> > > to do is to change the timestamp in the inner messages if necessary, in
> > > which case there will be the recompression overhead. However, in the
> case
> > > when the timestamp of the inner messages don't have to be changed
> > > (hopefully more common), there won't be the recompression overhead. In
> > > either case, we always set the timestamp in the outer message to be the
> > > timestamp of the latest inner message, in the leader.
> > >
> > > 3. Basically, in each log segment, we keep track of the timestamp of
> the
> > > latest message. If current time - timestamp of latest message > log
> > rolling
> > > interval, we roll a new log segment. So, if messages with later
> > timestamps
> > > keep getting added, we only roll new log segments based on size. On the
> > > other hand, if no new messages are added to a log, we can force a log
> > roll
> > > based on time, which addresses the issue in (b).
> > >
> > > 4. Hmm, the index is per segment and should only point to positions in
> > the
> > > corresponding .log file, not previous ones, right?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks a lot for the comments. Please see inline replies.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Becket,
> > > > >
> > > > > Thanks for the proposal. Looks good overall. A few comments below.
> > > > >
> > > > > 1. KIP-32 didn't say what timestamp should be set in a compressed
> > > > message.
> > > > > We probably should set it to the timestamp of the latest messages
> > > > included
> > > > > in the compressed one. This way, during indexing, we don't have to
> > > > > decompress the message.
> > > > >
> > > > That is a good point.
> > > > In normal cases, broker needs to decompress the message for
> > verification
> > > > purpose anyway. So building time index does not add additional
> > > > decompression.
> > > > During time index recovery, however, having a timestamp in compressed
> > > > message might save the decompression.
> > > >
> > > > Another thing I am thinking is we should make sure KIP-32 works well
> > with
> > > > KIP-31. i.e. we don't want to do recompression in order to add
> > timestamp
> > > to
> > > > messages.
> > > > Take the approach in my last email, the timestamp in the messages
> will
> > > > either all be overwritten by server 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-14 Thread Jun Rao
1. Hmm, it's more intuitive if the consumer sees the same timestamp whether
the messages are compressed or not. When message.timestamp.type=LogAppendTime,
we will need to set timestamp in each message if messages are not
compressed, so that the follower can get the same timestamp. So, it seems
that we should do the same thing for inner messages when messages are
compressed.

4. I thought on startup, we restore the timestamp of the latest message by
reading from the time index of the last log segment. So, what happens if
there are no index entries?

Thanks,

Jun

On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin  wrote:

> Thanks for the explanation, Jun.
>
> 1. That makes sense. So maybe we can do the following:
> (a) Set the timestamp in the compressed message to latest timestamp of all
> its inner messages. This works for both LogAppendTime and CreateTime.
> (b) If message.timestamp.type=LogAppendTime, the broker will overwrite all
> the inner message timestamp to -1 if they are not set to -1. This is mainly
> for topics that are using LogAppendTime. Hopefully the producer will set
> the timestamp to -1 in the ProducerRecord to avoid server side
> recompression.
>
> 3. I see. That works. So the semantic of log rolling becomes "roll out the
> log segment if it has been inactive since the latest message has arrived."
>
> 4. Yes. If the largest timestamp is in previous log segment. The time index
> for the current log segment does not have a valid offset in current log
> segment to point to. Maybe in that case we should build an empty log index.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao  wrote:
>
> > 1. I was thinking more about saving the decompression overhead in the
> > follower. Currently, the follower doesn't decompress the messages. To
> keep
> > it that way, the outer message needs to include the timestamp of the
> latest
> > inner message to build the time index in the follower. The simplest thing
> > to do is to change the timestamp in the inner messages if necessary, in
> > which case there will be the recompression overhead. However, in the case
> > when the timestamp of the inner messages don't have to be changed
> > (hopefully more common), there won't be the recompression overhead. In
> > either case, we always set the timestamp in the outer message to be the
> > timestamp of the latest inner message, in the leader.
> >
> > 3. Basically, in each log segment, we keep track of the timestamp of the
> > latest message. If current time - timestamp of latest message > log
> rolling
> > interval, we roll a new log segment. So, if messages with later
> timestamps
> > keep getting added, we only roll new log segments based on size. On the
> > other hand, if no new messages are added to a log, we can force a log
> roll
> > based on time, which addresses the issue in (b).
> >
> > 4. Hmm, the index is per segment and should only point to positions in
> the
> > corresponding .log file, not previous ones, right?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin 
> wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks a lot for the comments. Please see inline replies.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao  wrote:
> > >
> > > > Hi, Becket,
> > > >
> > > > Thanks for the proposal. Looks good overall. A few comments below.
> > > >
> > > > 1. KIP-32 didn't say what timestamp should be set in a compressed
> > > message.
> > > > We probably should set it to the timestamp of the latest messages
> > > included
> > > > in the compressed one. This way, during indexing, we don't have to
> > > > decompress the message.
> > > >
> > > That is a good point.
> > > In normal cases, broker needs to decompress the message for
> verification
> > > purpose anyway. So building time index does not add additional
> > > decompression.
> > > During time index recovery, however, having a timestamp in compressed
> > > message might save the decompression.
> > >
> > > Another thing I am thinking is we should make sure KIP-32 works well
> with
> > > KIP-31. i.e. we don't want to do recompression in order to add
> timestamp
> > to
> > > messages.
> > > Take the approach in my last email, the timestamp in the messages will
> > > either all be overwritten by server if
> > > message.timestamp.type=LogAppendTime, or they will not be overwritten
> if
> > > message.timestamp.type=CreateTime.
> > >
> > > Maybe we can use the timestamp in compressed messages in the following
> > way:
> > > If message.timestamp.type=LogAppendTime, we have to overwrite
> timestamps
> > > for all the messages. We can simply write the timestamp in the
> compressed
> > > message to avoid recompression.
> > > If message.timestamp.type=CreateTime, we do not need to overwrite the
> > > timestamps. We either reject the entire compressed message or We just
> > leave
> > > the compressed message timestamp to be -1.
> > >
> > > So the seman

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-14 Thread Becket Qin
Thanks for the explanation, Jun.

1. That makes sense. So maybe we can do the following:
(a) Set the timestamp in the compressed message to latest timestamp of all
its inner messages. This works for both LogAppendTime and CreateTime.
(b) If message.timestamp.type=LogAppendTime, the broker will overwrite all
the inner message timestamp to -1 if they are not set to -1. This is mainly
for topics that are using LogAppendTime. Hopefully the producer will set
the timestamp to -1 in the ProducerRecord to avoid server side
recompression.

3. I see. That works. So the semantic of log rolling becomes "roll out the
log segment if it has been inactive since the latest message has arrived."

4. Yes. If the largest timestamp is in previous log segment. The time index
for the current log segment does not have a valid offset in current log
segment to point to. Maybe in that case we should build an empty log index.

Thanks,

Jiangjie (Becket) Qin



On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao  wrote:

> 1. I was thinking more about saving the decompression overhead in the
> follower. Currently, the follower doesn't decompress the messages. To keep
> it that way, the outer message needs to include the timestamp of the latest
> inner message to build the time index in the follower. The simplest thing
> to do is to change the timestamp in the inner messages if necessary, in
> which case there will be the recompression overhead. However, in the case
> when the timestamp of the inner messages don't have to be changed
> (hopefully more common), there won't be the recompression overhead. In
> either case, we always set the timestamp in the outer message to be the
> timestamp of the latest inner message, in the leader.
>
> 3. Basically, in each log segment, we keep track of the timestamp of the
> latest message. If current time - timestamp of latest message > log rolling
> interval, we roll a new log segment. So, if messages with later timestamps
> keep getting added, we only roll new log segments based on size. On the
> other hand, if no new messages are added to a log, we can force a log roll
> based on time, which addresses the issue in (b).
>
> 4. Hmm, the index is per segment and should only point to positions in the
> corresponding .log file, not previous ones, right?
>
> Thanks,
>
> Jun
>
>
>
> On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin  wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments. Please see inline replies.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao  wrote:
> >
> > > Hi, Becket,
> > >
> > > Thanks for the proposal. Looks good overall. A few comments below.
> > >
> > > 1. KIP-32 didn't say what timestamp should be set in a compressed
> > message.
> > > We probably should set it to the timestamp of the latest messages
> > included
> > > in the compressed one. This way, during indexing, we don't have to
> > > decompress the message.
> > >
> > That is a good point.
> > In normal cases, broker needs to decompress the message for verification
> > purpose anyway. So building time index does not add additional
> > decompression.
> > During time index recovery, however, having a timestamp in compressed
> > message might save the decompression.
> >
> > Another thing I am thinking is we should make sure KIP-32 works well with
> > KIP-31. i.e. we don't want to do recompression in order to add timestamp
> to
> > messages.
> > Take the approach in my last email, the timestamp in the messages will
> > either all be overwritten by server if
> > message.timestamp.type=LogAppendTime, or they will not be overwritten if
> > message.timestamp.type=CreateTime.
> >
> > Maybe we can use the timestamp in compressed messages in the following
> way:
> > If message.timestamp.type=LogAppendTime, we have to overwrite timestamps
> > for all the messages. We can simply write the timestamp in the compressed
> > message to avoid recompression.
> > If message.timestamp.type=CreateTime, we do not need to overwrite the
> > timestamps. We either reject the entire compressed message or We just
> leave
> > the compressed message timestamp to be -1.
> >
> > So the semantic of the timestamp field in compressed message field
> becomes:
> > if it is greater than 0, that means LogAppendTime is used, the timestamp
> of
> > the inner messages is the compressed message LogAppendTime. If it is -1,
> > that means the CreateTime is used, the timestamp is in each individual
> > inner message.
> >
> > This sacrifice the speed of recovery but seems worthy because we avoid
> > recompression.
> >
> >
> > > 2. In KIP-33, should we make the time-based index interval
> configurable?
> > > Perhaps we can default it 60 secs, but allow users to configure it to
> > > smaller values if they want more precision.
> > >
> > Yes, we can do that.
> >
> >
> > > 3. In KIP-33, I am not sure if log rolling should be based on the
> > earliest
> > > message. This would mean that we will need to roll a log segment every
> > time
> >

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-14 Thread Jun Rao
1. I was thinking more about saving the decompression overhead in the
follower. Currently, the follower doesn't decompress the messages. To keep
it that way, the outer message needs to include the timestamp of the latest
inner message to build the time index in the follower. The simplest thing
to do is to change the timestamp in the inner messages if necessary, in
which case there will be the recompression overhead. However, in the case
when the timestamp of the inner messages don't have to be changed
(hopefully more common), there won't be the recompression overhead. In
either case, we always set the timestamp in the outer message to be the
timestamp of the latest inner message, in the leader.

3. Basically, in each log segment, we keep track of the timestamp of the
latest message. If current time - timestamp of latest message > log rolling
interval, we roll a new log segment. So, if messages with later timestamps
keep getting added, we only roll new log segments based on size. On the
other hand, if no new messages are added to a log, we can force a log roll
based on time, which addresses the issue in (b).

4. Hmm, the index is per segment and should only point to positions in the
corresponding .log file, not previous ones, right?

Thanks,

Jun



On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin  wrote:

> Hi Jun,
>
> Thanks a lot for the comments. Please see inline replies.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao  wrote:
>
> > Hi, Becket,
> >
> > Thanks for the proposal. Looks good overall. A few comments below.
> >
> > 1. KIP-32 didn't say what timestamp should be set in a compressed
> message.
> > We probably should set it to the timestamp of the latest messages
> included
> > in the compressed one. This way, during indexing, we don't have to
> > decompress the message.
> >
> That is a good point.
> In normal cases, broker needs to decompress the message for verification
> purpose anyway. So building time index does not add additional
> decompression.
> During time index recovery, however, having a timestamp in compressed
> message might save the decompression.
>
> Another thing I am thinking is we should make sure KIP-32 works well with
> KIP-31. i.e. we don't want to do recompression in order to add timestamp to
> messages.
> Take the approach in my last email, the timestamp in the messages will
> either all be overwritten by server if
> message.timestamp.type=LogAppendTime, or they will not be overwritten if
> message.timestamp.type=CreateTime.
>
> Maybe we can use the timestamp in compressed messages in the following way:
> If message.timestamp.type=LogAppendTime, we have to overwrite timestamps
> for all the messages. We can simply write the timestamp in the compressed
> message to avoid recompression.
> If message.timestamp.type=CreateTime, we do not need to overwrite the
> timestamps. We either reject the entire compressed message or We just leave
> the compressed message timestamp to be -1.
>
> So the semantic of the timestamp field in compressed message field becomes:
> if it is greater than 0, that means LogAppendTime is used, the timestamp of
> the inner messages is the compressed message LogAppendTime. If it is -1,
> that means the CreateTime is used, the timestamp is in each individual
> inner message.
>
> This sacrifice the speed of recovery but seems worthy because we avoid
> recompression.
>
>
> > 2. In KIP-33, should we make the time-based index interval configurable?
> > Perhaps we can default it 60 secs, but allow users to configure it to
> > smaller values if they want more precision.
> >
> Yes, we can do that.
>
>
> > 3. In KIP-33, I am not sure if log rolling should be based on the
> earliest
> > message. This would mean that we will need to roll a log segment every
> time
> > we get a message delayed by the log rolling time interval. Also, on
> broker
> > startup, we can get the timestamp of the latest message in a log segment
> > pretty efficiently by just looking at the last time index entry. But
> > getting the timestamp of the earliest timestamp requires a full scan of
> all
> > log segments, which can be expensive. Previously, there were two use
> cases
> > of time-based rolling: (a) more accurate time-based indexing and (b)
> > retaining data by time (since the active segment is never deleted). (a)
> is
> > already solved with a time-based index. For (b), if the retention is
> based
> > on the timestamp of the latest message in a log segment, perhaps log
> > rolling should be based on that too.
> >
> I am not sure how to make log rolling work with the latest timestamp in
> current log segment. Do you mean the log rolling can based on the last log
> segment's latest timestamp? If so how do we roll out the first segment?
>
>
> > 4. In KIP-33, I presume the timestamp in the time index will be
> > monotonically increasing. So, if all messages in a log segment have a
> > timestamp less than the largest timestamp in the previous log segment,

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-14 Thread Becket Qin
Hi Jun,

Thanks a lot for the comments. Please see inline replies.

Thanks,

Jiangjie (Becket) Qin

On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao  wrote:

> Hi, Becket,
>
> Thanks for the proposal. Looks good overall. A few comments below.
>
> 1. KIP-32 didn't say what timestamp should be set in a compressed message.
> We probably should set it to the timestamp of the latest messages included
> in the compressed one. This way, during indexing, we don't have to
> decompress the message.
>
That is a good point.
In normal cases, broker needs to decompress the message for verification
purpose anyway. So building time index does not add additional
decompression.
During time index recovery, however, having a timestamp in compressed
message might save the decompression.

Another thing I am thinking is we should make sure KIP-32 works well with
KIP-31. i.e. we don't want to do recompression in order to add timestamp to
messages.
Take the approach in my last email, the timestamp in the messages will
either all be overwritten by server if
message.timestamp.type=LogAppendTime, or they will not be overwritten if
message.timestamp.type=CreateTime.

Maybe we can use the timestamp in compressed messages in the following way:
If message.timestamp.type=LogAppendTime, we have to overwrite timestamps
for all the messages. We can simply write the timestamp in the compressed
message to avoid recompression.
If message.timestamp.type=CreateTime, we do not need to overwrite the
timestamps. We either reject the entire compressed message or We just leave
the compressed message timestamp to be -1.

So the semantic of the timestamp field in compressed message field becomes:
if it is greater than 0, that means LogAppendTime is used, the timestamp of
the inner messages is the compressed message LogAppendTime. If it is -1,
that means the CreateTime is used, the timestamp is in each individual
inner message.

This sacrifice the speed of recovery but seems worthy because we avoid
recompression.


> 2. In KIP-33, should we make the time-based index interval configurable?
> Perhaps we can default it 60 secs, but allow users to configure it to
> smaller values if they want more precision.
>
Yes, we can do that.


> 3. In KIP-33, I am not sure if log rolling should be based on the earliest
> message. This would mean that we will need to roll a log segment every time
> we get a message delayed by the log rolling time interval. Also, on broker
> startup, we can get the timestamp of the latest message in a log segment
> pretty efficiently by just looking at the last time index entry. But
> getting the timestamp of the earliest timestamp requires a full scan of all
> log segments, which can be expensive. Previously, there were two use cases
> of time-based rolling: (a) more accurate time-based indexing and (b)
> retaining data by time (since the active segment is never deleted). (a) is
> already solved with a time-based index. For (b), if the retention is based
> on the timestamp of the latest message in a log segment, perhaps log
> rolling should be based on that too.
>
I am not sure how to make log rolling work with the latest timestamp in
current log segment. Do you mean the log rolling can based on the last log
segment's latest timestamp? If so how do we roll out the first segment?


> 4. In KIP-33, I presume the timestamp in the time index will be
> monotonically increasing. So, if all messages in a log segment have a
> timestamp less than the largest timestamp in the previous log segment, we
> will use the latter to index this log segment?
>
Yes. The timestamps are monotonically increasing. If the largest timestamp
in the previous segment is very big, it is possible the time index of the
current segment only have two index entries (inserted during segment
creation and roll out), both are pointing to a message in the previous log
segment. This is the corner case I mentioned before that we should expire
the next log segment even before expiring the previous log segment just
because the largest timestamp is in previous log segment. In current
approach, we will wait until the previous log segment expires, and then
delete both the previous log segment and the next log segment.


> 5. In KIP-32, in the wire protocol, we mention both timestamp and time.
> They should be consistent.
>
Will fix the wiki page.


> Jun
>
>
>
> On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin  wrote:
>
> > Hey Jay,
> >
> > Thanks for the comments.
> >
> > Good point about the actions after when max.message.time.difference is
> > exceeded. Rejection is a useful behavior although I cannot think of use
> > case at LinkedIn at this moment. I think it makes sense to add a
> > configuration.
> >
> > How about the following configurations?
> > 1. message.timestamp.type=CreateTime/LogAppendTime
> > 2. max.message.time.difference.ms
> >
> > if message.timestamp.type is set to CreateTime, when the broker receives
> a
> > message, it will further check max.message.time.difference.ms, and

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-14 Thread Jun Rao
Hi, Becket,

Thanks for the proposal. Looks good overall. A few comments below.

1. KIP-32 didn't say what timestamp should be set in a compressed message.
We probably should set it to the timestamp of the latest messages included
in the compressed one. This way, during indexing, we don't have to
decompress the message.

2. In KIP-33, should we make the time-based index interval configurable?
Perhaps we can default it 60 secs, but allow users to configure it to
smaller values if they want more precision.

3. In KIP-33, I am not sure if log rolling should be based on the earliest
message. This would mean that we will need to roll a log segment every time
we get a message delayed by the log rolling time interval. Also, on broker
startup, we can get the timestamp of the latest message in a log segment
pretty efficiently by just looking at the last time index entry. But
getting the timestamp of the earliest timestamp requires a full scan of all
log segments, which can be expensive. Previously, there were two use cases
of time-based rolling: (a) more accurate time-based indexing and (b)
retaining data by time (since the active segment is never deleted). (a) is
already solved with a time-based index. For (b), if the retention is based
on the timestamp of the latest message in a log segment, perhaps log
rolling should be based on that too.

4. In KIP-33, I presume the timestamp in the time index will be
monotonically increasing. So, if all messages in a log segment have a
timestamp less than the largest timestamp in the previous log segment, we
will use the latter to index this log segment?

5. In KIP-32, in the wire protocol, we mention both timestamp and time.
They should be consistent.

Jun



On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin  wrote:

> Hey Jay,
>
> Thanks for the comments.
>
> Good point about the actions after when max.message.time.difference is
> exceeded. Rejection is a useful behavior although I cannot think of use
> case at LinkedIn at this moment. I think it makes sense to add a
> configuration.
>
> How about the following configurations?
> 1. message.timestamp.type=CreateTime/LogAppendTime
> 2. max.message.time.difference.ms
>
> if message.timestamp.type is set to CreateTime, when the broker receives a
> message, it will further check max.message.time.difference.ms, and will
> reject the message it the time difference exceeds the threshold.
> If message.timestamp.type is set to LogAppendTime, the broker will always
> stamp the message with current server time, regardless the value of
> max.message.time.difference.ms
>
> This will make sure the message on the broker is either CreateTime or
> LogAppendTime, but not mixture of both.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps  wrote:
>
> > Hey Becket,
> >
> > That summary of pros and cons sounds about right to me.
> >
> > There are potentially two actions you could take when
> > max.message.time.difference is exceeded--override it or reject the
> > message entirely. Can we pick one of these or does the action need to
> > be configurable too? (I'm not sure). The downside of more
> > configuration is that it is more fiddly and has more modes.
> >
> > I suppose the reason I was thinking of this as a "difference" rather
> > than a hard type was that if you were going to go the reject mode you
> > would need some tolerance setting (i.e. if your SLA is that if your
> > timestamp is off by more than 10 minutes I give you an error). I agree
> > with you that having one field that is potentially containing a mix of
> > two values is a bit weird.
> >
> > -Jay
> >
> > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin  wrote:
> > > It looks the format of the previous email was messed up. Send it again.
> > >
> > > Just to recap, the last proposal Jay made (with some implementation
> > > details added)
> > > was:
> > >
> > > 1. Allow user to stamp the message when produce
> > >
> > > 2. When broker receives a message it take a look at the difference
> > between
> > > its local time and the timestamp in the message.
> > >   a. If the time difference is within a configurable
> > > max.message.time.difference.ms, the server will accept it and append
> it
> > to
> > > the log.
> > >   b. If the time difference is beyond the configured
> > > max.message.time.difference.ms, the server will override the timestamp
> > with
> > > its current local time and append the message to the log.
> > >   c. The default value of max.message.time.difference would be set to
> > > Long.MaxValue.
> > >
> > > 3. The configurable time difference threshold
> > > max.message.time.difference.ms will
> > > be a per topic configuration.
> > >
> > > 4. The indexed will be built so it has the following guarantee.
> > >   a. If user search by time stamp:
> > >   - all the messages after that timestamp will be consumed.
> > >   - user might see earlier messages.
> > >   b. The log retention will take a look at the last time i

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-10 Thread Becket Qin
Hey Jay,

Thanks for the comments.

Good point about the actions after when max.message.time.difference is
exceeded. Rejection is a useful behavior although I cannot think of use
case at LinkedIn at this moment. I think it makes sense to add a
configuration.

How about the following configurations?
1. message.timestamp.type=CreateTime/LogAppendTime
2. max.message.time.difference.ms

if message.timestamp.type is set to CreateTime, when the broker receives a
message, it will further check max.message.time.difference.ms, and will
reject the message it the time difference exceeds the threshold.
If message.timestamp.type is set to LogAppendTime, the broker will always
stamp the message with current server time, regardless the value of
max.message.time.difference.ms

This will make sure the message on the broker is either CreateTime or
LogAppendTime, but not mixture of both.

What do you think?

Thanks,

Jiangjie (Becket) Qin


On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps  wrote:

> Hey Becket,
>
> That summary of pros and cons sounds about right to me.
>
> There are potentially two actions you could take when
> max.message.time.difference is exceeded--override it or reject the
> message entirely. Can we pick one of these or does the action need to
> be configurable too? (I'm not sure). The downside of more
> configuration is that it is more fiddly and has more modes.
>
> I suppose the reason I was thinking of this as a "difference" rather
> than a hard type was that if you were going to go the reject mode you
> would need some tolerance setting (i.e. if your SLA is that if your
> timestamp is off by more than 10 minutes I give you an error). I agree
> with you that having one field that is potentially containing a mix of
> two values is a bit weird.
>
> -Jay
>
> On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin  wrote:
> > It looks the format of the previous email was messed up. Send it again.
> >
> > Just to recap, the last proposal Jay made (with some implementation
> > details added)
> > was:
> >
> > 1. Allow user to stamp the message when produce
> >
> > 2. When broker receives a message it take a look at the difference
> between
> > its local time and the timestamp in the message.
> >   a. If the time difference is within a configurable
> > max.message.time.difference.ms, the server will accept it and append it
> to
> > the log.
> >   b. If the time difference is beyond the configured
> > max.message.time.difference.ms, the server will override the timestamp
> with
> > its current local time and append the message to the log.
> >   c. The default value of max.message.time.difference would be set to
> > Long.MaxValue.
> >
> > 3. The configurable time difference threshold
> > max.message.time.difference.ms will
> > be a per topic configuration.
> >
> > 4. The indexed will be built so it has the following guarantee.
> >   a. If user search by time stamp:
> >   - all the messages after that timestamp will be consumed.
> >   - user might see earlier messages.
> >   b. The log retention will take a look at the last time index entry in
> the
> > time index file. Because the last entry will be the latest timestamp in
> the
> > entire log segment. If that entry expires, the log segment will be
> deleted.
> >   c. The log rolling has to depend on the earliest timestamp. In this
> case
> > we may need to keep a in memory timestamp only for the current active
> log.
> > On recover, we will need to read the active log segment to get this
> timestamp
> > of the earliest messages.
> >
> > 5. The downside of this proposal are:
> >   a. The timestamp might not be monotonically increasing.
> >   b. The log retention might become non-deterministic. i.e. When a
> message
> > will be deleted now depends on the timestamp of the other messages in the
> > same log segment. And those timestamps are provided by
> > user within a range depending on what the time difference threshold
> > configuration is.
> >   c. The semantic meaning of the timestamp in the messages could be a
> little
> > bit vague because some of them come from the producer and some of them
> are
> > overwritten by brokers.
> >
> > 6. Although the proposal has some downsides, it gives user the
> flexibility
> > to use the timestamp.
> >   a. If the threshold is set to Long.MaxValue. The timestamp in the
> message is
> > equivalent to CreateTime.
> >   b. If the threshold is set to 0. The timestamp in the message is
> equivalent
> > to LogAppendTime.
> >
> > This proposal actually allows user to use either CreateTime or
> LogAppendTime
> > without introducing two timestamp concept at the same time. I have
> updated
> > the wiki for KIP-32 and KIP-33 with this proposal.
> >
> > One thing I am thinking is that instead of having a time difference
> threshold,
> > should we simply set have a TimestampType configuration? Because in most
> > cases, people will either set the threshold to 0 or Long.MaxValue.
> Setting
> > anything in between will make the timestamp in the m

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-09 Thread Jay Kreps
Hey Becket,

That summary of pros and cons sounds about right to me.

There are potentially two actions you could take when
max.message.time.difference is exceeded--override it or reject the
message entirely. Can we pick one of these or does the action need to
be configurable too? (I'm not sure). The downside of more
configuration is that it is more fiddly and has more modes.

I suppose the reason I was thinking of this as a "difference" rather
than a hard type was that if you were going to go the reject mode you
would need some tolerance setting (i.e. if your SLA is that if your
timestamp is off by more than 10 minutes I give you an error). I agree
with you that having one field that is potentially containing a mix of
two values is a bit weird.

-Jay

On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin  wrote:
> It looks the format of the previous email was messed up. Send it again.
>
> Just to recap, the last proposal Jay made (with some implementation
> details added)
> was:
>
> 1. Allow user to stamp the message when produce
>
> 2. When broker receives a message it take a look at the difference between
> its local time and the timestamp in the message.
>   a. If the time difference is within a configurable
> max.message.time.difference.ms, the server will accept it and append it to
> the log.
>   b. If the time difference is beyond the configured
> max.message.time.difference.ms, the server will override the timestamp with
> its current local time and append the message to the log.
>   c. The default value of max.message.time.difference would be set to
> Long.MaxValue.
>
> 3. The configurable time difference threshold
> max.message.time.difference.ms will
> be a per topic configuration.
>
> 4. The indexed will be built so it has the following guarantee.
>   a. If user search by time stamp:
>   - all the messages after that timestamp will be consumed.
>   - user might see earlier messages.
>   b. The log retention will take a look at the last time index entry in the
> time index file. Because the last entry will be the latest timestamp in the
> entire log segment. If that entry expires, the log segment will be deleted.
>   c. The log rolling has to depend on the earliest timestamp. In this case
> we may need to keep a in memory timestamp only for the current active log.
> On recover, we will need to read the active log segment to get this timestamp
> of the earliest messages.
>
> 5. The downside of this proposal are:
>   a. The timestamp might not be monotonically increasing.
>   b. The log retention might become non-deterministic. i.e. When a message
> will be deleted now depends on the timestamp of the other messages in the
> same log segment. And those timestamps are provided by
> user within a range depending on what the time difference threshold
> configuration is.
>   c. The semantic meaning of the timestamp in the messages could be a little
> bit vague because some of them come from the producer and some of them are
> overwritten by brokers.
>
> 6. Although the proposal has some downsides, it gives user the flexibility
> to use the timestamp.
>   a. If the threshold is set to Long.MaxValue. The timestamp in the message is
> equivalent to CreateTime.
>   b. If the threshold is set to 0. The timestamp in the message is equivalent
> to LogAppendTime.
>
> This proposal actually allows user to use either CreateTime or LogAppendTime
> without introducing two timestamp concept at the same time. I have updated
> the wiki for KIP-32 and KIP-33 with this proposal.
>
> One thing I am thinking is that instead of having a time difference threshold,
> should we simply set have a TimestampType configuration? Because in most
> cases, people will either set the threshold to 0 or Long.MaxValue. Setting
> anything in between will make the timestamp in the message meaningless to
> user - user don't know if the timestamp has been overwritten by the brokers.
>
> Any thoughts?
>
> Thanks,
> Jiangjie (Becket) Qin
>
> On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin 
> wrote:
>
>> Bump up this thread.
>>
>> Just to recap, the last proposal Jay made (with some implementation details
>> added) was:
>>
>>1. Allow user to stamp the message when produce
>>2. When broker receives a message it take a look at the difference
>>between its local time and the timestamp in the message.
>>   - If the time difference is within a configurable
>>   max.message.time.difference.ms, the server will accept it and append
>>   it to the log.
>>   - If the time difference is beyond the configured
>>   max.message.time.difference.ms, the server will override the
>>   timestamp with its current local time and append the message to the
>> log.
>>   - The default value of max.message.time.difference would be set to
>>   Long.MaxValue.
>>   3. The configurable time difference threshold
>>max.message.time.difference.ms will be a per topic configuration.
>>4. The indexed will be built so it has the 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-07 Thread Becket Qin
It looks the format of the previous email was messed up. Send it again.

Just to recap, the last proposal Jay made (with some implementation
details added)
was:

1. Allow user to stamp the message when produce

2. When broker receives a message it take a look at the difference between
its local time and the timestamp in the message.
  a. If the time difference is within a configurable
max.message.time.difference.ms, the server will accept it and append it to
the log.
  b. If the time difference is beyond the configured
max.message.time.difference.ms, the server will override the timestamp with
its current local time and append the message to the log.
  c. The default value of max.message.time.difference would be set to
Long.MaxValue.

3. The configurable time difference threshold
max.message.time.difference.ms will
be a per topic configuration.

4. The indexed will be built so it has the following guarantee.
  a. If user search by time stamp:
  - all the messages after that timestamp will be consumed.
  - user might see earlier messages.
  b. The log retention will take a look at the last time index entry in the
time index file. Because the last entry will be the latest timestamp in the
entire log segment. If that entry expires, the log segment will be deleted.
  c. The log rolling has to depend on the earliest timestamp. In this case
we may need to keep a in memory timestamp only for the current active log.
On recover, we will need to read the active log segment to get this timestamp
of the earliest messages.

5. The downside of this proposal are:
  a. The timestamp might not be monotonically increasing.
  b. The log retention might become non-deterministic. i.e. When a message
will be deleted now depends on the timestamp of the other messages in the
same log segment. And those timestamps are provided by
user within a range depending on what the time difference threshold
configuration is.
  c. The semantic meaning of the timestamp in the messages could be a little
bit vague because some of them come from the producer and some of them are
overwritten by brokers.

6. Although the proposal has some downsides, it gives user the flexibility
to use the timestamp.
  a. If the threshold is set to Long.MaxValue. The timestamp in the message is
equivalent to CreateTime.
  b. If the threshold is set to 0. The timestamp in the message is equivalent
to LogAppendTime.

This proposal actually allows user to use either CreateTime or LogAppendTime
without introducing two timestamp concept at the same time. I have updated
the wiki for KIP-32 and KIP-33 with this proposal.

One thing I am thinking is that instead of having a time difference threshold,
should we simply set have a TimestampType configuration? Because in most
cases, people will either set the threshold to 0 or Long.MaxValue. Setting
anything in between will make the timestamp in the message meaningless to
user - user don't know if the timestamp has been overwritten by the brokers.

Any thoughts?

Thanks,
Jiangjie (Becket) Qin

On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin 
wrote:

> Bump up this thread.
>
> Just to recap, the last proposal Jay made (with some implementation details
> added) was:
>
>1. Allow user to stamp the message when produce
>2. When broker receives a message it take a look at the difference
>between its local time and the timestamp in the message.
>   - If the time difference is within a configurable
>   max.message.time.difference.ms, the server will accept it and append
>   it to the log.
>   - If the time difference is beyond the configured
>   max.message.time.difference.ms, the server will override the
>   timestamp with its current local time and append the message to the
> log.
>   - The default value of max.message.time.difference would be set to
>   Long.MaxValue.
>   3. The configurable time difference threshold
>max.message.time.difference.ms will be a per topic configuration.
>4. The indexed will be built so it has the following guarantee.
>   - If user search by time stamp:
>- all the messages after that timestamp will be consumed.
>   - user might see earlier messages.
>   - The log retention will take a look at the last time index entry in
>   the time index file. Because the last entry will be the latest
> timestamp in
>   the entire log segment. If that entry expires, the log segment will
> be
>   deleted.
>   - The log rolling has to depend on the earliest timestamp. In this
>   case we may need to keep a in memory timestamp only for the
> current active
>   log. On recover, we will need to read the active log segment to get
> this
>   timestamp of the earliest messages.
>5. The downside of this proposal are:
>   - The timestamp might not be monotonically increasing.
>   - The log retention might become non-deterministic. i.e. When a
>   message will be deleted now depends on the timestamp of the
> 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-12-07 Thread Jiangjie Qin
Bump up this thread.

Just to recap, the last proposal Jay made (with some implementation details
added) was:

   1. Allow user to stamp the message when produce
   2. When broker receives a message it take a look at the difference
   between its local time and the timestamp in the message.
  - If the time difference is within a configurable
  max.message.time.difference.ms, the server will accept it and append
  it to the log.
  - If the time difference is beyond the configured
  max.message.time.difference.ms, the server will override the
  timestamp with its current local time and append the message to the log.
  - The default value of max.message.time.difference would be set to
  Long.MaxValue.
  3. The configurable time difference threshold
   max.message.time.difference.ms will be a per topic configuration.
   4. The indexed will be built so it has the following guarantee.
  - If user search by time stamp:
   - all the messages after that timestamp will be consumed.
  - user might see earlier messages.
  - The log retention will take a look at the last time index entry in
  the time index file. Because the last entry will be the latest
timestamp in
  the entire log segment. If that entry expires, the log segment will be
  deleted.
  - The log rolling has to depend on the earliest timestamp. In this
  case we may need to keep a in memory timestamp only for the
current active
  log. On recover, we will need to read the active log segment to get this
  timestamp of the earliest messages.
   5. The downside of this proposal are:
  - The timestamp might not be monotonically increasing.
  - The log retention might become non-deterministic. i.e. When a
  message will be deleted now depends on the timestamp of the
other messages
  in the same log segment. And those timestamps are provided by
user within a
  range depending on what the time difference threshold configuration is.
  - The semantic meaning of the timestamp in the messages could be a
  little bit vague because some of them come from the producer and some of
  them are overwritten by brokers.
  6. Although the proposal has some downsides, it gives user the
   flexibility to use the timestamp.
   - If the threshold is set to Long.MaxValue. The timestamp in the message
  is equivalent to CreateTime.
  - If the threshold is set to 0. The timestamp in the message is
  equivalent to LogAppendTime.

This proposal actually allows user to use either CreateTime or
LogAppendTime without introducing two timestamp concept at the same time. I
have updated the wiki for KIP-32 and KIP-33 with this proposal.

One thing I am thinking is that instead of having a time difference
threshold, should we simply set have a TimestampType configuration? Because
in most cases, people will either set the threshold to 0 or Long.MaxValue.
Setting anything in between will make the timestamp in the message
meaningless to user - user don't know if the timestamp has been overwritten
by the brokers.

Any thoughts?

Thanks,
Jiangjie (Becket) Qin

On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin  wrote:

> Hi Jay,
>
> Thanks for such detailed explanation. I think we both are trying to make
> CreateTime work for us if possible. To me by "work" it means clear
> guarantees on:
> 1. Log Retention Time enforcement.
> 2. Log Rolling time enforcement (This might be less a concern as you
> pointed out)
> 3. Application search message by time.
>
> WRT (1), I agree the expectation for log retention might be different
> depending on who we ask. But my concern is about the level of guarantee we
> give to user. My observation is that a clear guarantee to user is critical
> regardless of the mechanism we choose. And this is the subtle but important
> difference between using LogAppendTime and CreateTime.
>
> Let's say user asks this question: How long will my message stay in Kafka?
>
> If we use LogAppendTime for log retention, the answer is message will stay
> in Kafka for retention time after the message is produced (to be more
> precise, upper bounded by log.rolling.ms + log.retention.ms). User has a
> clear guarantee and they may decide whether or not to put the message into
> Kafka. Or how to adjust the retention time according to their requirements.
> If we use create time for log retention, the answer would be it depends.
> The best answer we can give is at least retention.ms because there is no
> guarantee when the messages will be deleted after that. If a message sits
> somewhere behind a larger create time, the message might stay longer than
> expected. But we don't know how longer it would be because it depends on
> the create time. In this case, it is hard for user to decide what to do.
>
> I am worrying about this because a blurring guarantee has bitten us
> before, e.g. Topic creation. We have received many questions like "why my
> topic is not there after I created

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-26 Thread Jiangjie Qin
Hi Jay,

Thanks for such detailed explanation. I think we both are trying to make
CreateTime work for us if possible. To me by "work" it means clear
guarantees on:
1. Log Retention Time enforcement.
2. Log Rolling time enforcement (This might be less a concern as you
pointed out)
3. Application search message by time.

WRT (1), I agree the expectation for log retention might be different
depending on who we ask. But my concern is about the level of guarantee we
give to user. My observation is that a clear guarantee to user is critical
regardless of the mechanism we choose. And this is the subtle but important
difference between using LogAppendTime and CreateTime.

Let's say user asks this question: How long will my message stay in Kafka?

If we use LogAppendTime for log retention, the answer is message will stay
in Kafka for retention time after the message is produced (to be more
precise, upper bounded by log.rolling.ms + log.retention.ms). User has a
clear guarantee and they may decide whether or not to put the message into
Kafka. Or how to adjust the retention time according to their requirements.
If we use create time for log retention, the answer would be it depends.
The best answer we can give is at least retention.ms because there is no
guarantee when the messages will be deleted after that. If a message sits
somewhere behind a larger create time, the message might stay longer than
expected. But we don't know how longer it would be because it depends on
the create time. In this case, it is hard for user to decide what to do.

I am worrying about this because a blurring guarantee has bitten us before,
e.g. Topic creation. We have received many questions like "why my topic is
not there after I created it". I can imagine we receive similar question
asking "why my message is still there after retention time has reached". So
my understanding is that a clear and solid guarantee is better than having
a mechanism that works in most cases but occasionally does not work.

If we think of the retention guarantee we provide with LogAppendTime, it is
not broken as you said, because we are telling user the log retention is
NOT based on create time at the first place.

WRT (3), no matter whether we index on LogAppendTime or CreateTime, the
best guarantee we can provide with user is "not missing message after a
certain timestamp". Therefore I actually really like to index on CreateTime
because that is the timestamp we provide to user, and we can have the solid
guarantee.
On the other hand, indexing on LogAppendTime and giving user CreateTime
does not provide solid guarantee when user do search based on timestamp. It
only works when LogAppendTime is always no earlier than CreateTime. This is
a reasonable assumption and we can easily enforce it.

With above, I am not sure if we can avoid server timestamp to make log
retention work with a clear guarantee. For searching by timestamp use case,
I really want to have the index built on CreateTime. But with a reasonable
assumption and timestamp enforcement, a LogAppendTime index would also work.

Thanks,

Jiangjie (Becket) Qin



On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps  wrote:

> Hey Becket,
>
> Let me see if I can address your concerns:
>
> 1. Let's say we have two source clusters that are mirrored to the same
> > target cluster. For some reason one of the mirror maker from a cluster
> dies
> > and after fix the issue we want to resume mirroring. In this case it is
> > possible that when the mirror maker resumes mirroring, the timestamp of
> the
> > messages have already gone beyond the acceptable timestamp range on
> broker.
> > In order to let those messages go through, we have to bump up the
> > *max.append.delay
> > *for all the topics on the target broker. This could be painful.
>
>
> Actually what I was suggesting was different. Here is my observation:
> clusters/topics directly produced to by applications have a valid assertion
> that log append time and create time are similar (let's call these
> "unbuffered"); other cluster/topic such as those that receive data from a
> database, a log file, or another kafka cluster don't have that assertion,
> for these "buffered" clusters data can be arbitrarily late. This means any
> use of log append time on these buffered clusters is not very meaningful,
> and create time and log append time "should" be similar on unbuffered
> clusters so you can probably use either.
>
> Using log append time on buffered clusters actually results in bad things.
> If you request the offset for a given time you get don't end up getting
> data for that time but rather data that showed up at that time. If you try
> to retain 7 days of data it may mostly work but any kind of bootstrapping
> will result in retaining much more (potentially the whole database
> contents!).
>
> So what I am suggesting in terms of the use of the max.append.delay is that
> unbuffered clusters would have this set and buffered clusters would not. In
> other words, in 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-25 Thread Kartik Paramasivam
one more thing.. I wanted to clarify that I am not proposing stamping two
time properties per message either.

I think at this point we have a solution for "event time" (as described in
Millwheel) at Li.. (it is in the event payload).

So we just need a way to fix the log retention isssues and improve time to
offset queries (seek by time.. mostly to deal with lag and such..)

Thanks
Kartik

On Sun, Oct 25, 2015 at 11:02 PM, Kartik Paramasivam <
kparamasi...@linkedin.com> wrote:

> This thread has gone rather long.  So I might not be fully clear on
> everybody's positions.
>
> To make sure I understand the possible proposals.  Here is a concrete and
> 'must have' scenario for LinkedIn.
>
> We want to bootstrap from say a Database dump (espresso) into a kafka
> topic.  (Databus bootstrap)
> What should be the creationTime that we stamp when we send into the Kafka
> topic ?
> Would we use the time at which the database record was touched (event
> time) ? or should it just be the local system time when we are publishing
> into the Kafka bootstrap topic.
>
> If it is the former (original database update time), then if I understand
> the proposal then we would set the max.append.delay to be large and allow
> the message to be produced into Kafka.  However this would mean that older
> events could possibly (in the worst case) be cleaned up as soon as they are
> published ?  So this option wouldn't work.
>
> If it is the later (when we publish into kafka.. we just stamp the current
> local time ), then that would work.. it is however as good as stamping it
> on the broker..   In this world the "event time" will be in the avro
> payload.. in case a downstream application wants to know when the event
> actually happened.
>
>
>
> On Thu, Oct 22, 2015 at 11:02 AM, Jay Kreps  wrote:
>
>> Hey Kartik,
>>
>> Yes, I agree exactly with your characterization.
>>
>> The question is "what is the meaning of retention?" It could mean either:
>> 1. "Retain data that is no more than 7 days old"
>> 2. "Retain data for 7 days from when you get it"
>>
>> I don't know if either is actually a clear winner.
>>
>> Each is intuitive and easily expressible:
>> 1. "We have the last 7 days of data in Kafka"
>> 2. "You have 7 days to get your data from Kafka from whenever it arrives"
>>
>> Each has corner cases:
>> 1. May lead to retaining data for too little time in the bootstrap case
>> 2. May lead to over-retention in the bootstrap case
>>
>> Which failure is worse probably depends on who you ask:
>> 1. May lead to not retaining data long enough for a consumer to get it,
>> which the consumer would say is very bad
>> 2. May lead to running out of disk space, which ops would say is very bad
>>
>> -Jay
>>
>>
>>
>>
>> On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
>> kparamasi...@linkedin.com.invalid> wrote:
>>
>> > Joel or Becket will probably respond back in more detail.. but here are
>> my
>> > 2c.
>> >
>> > From the standpoint of LinkedIN, the suggested proposal works.. in
>> essence
>> > max.appenddelay can be used to turn "creationTime" into "logAppendTime".
>> >this does mean that at LinkedIn we won't be able to use
>> "creationTime"..
>> > however that might also be fine because we anyways use the timeStamp
>> that
>> > is set inside the avro payload.
>> >
>> > Keeping LI aside though, it looks like there are two distinct possible
>> > goals.
>> > 1. The broker will retain messages for x days after a message shows up
>> at
>> > the broker.   This behavior would super deterministic and would never
>> > change depending on the contents of the message or anything else.
>> >
>> > 2. The client is in "partial" control of how long a message stays in the
>> > broker based on the creationTime stamped by the client.
>> >
>> > Although (2) could be a feature in some scenarios..but in many
>> scenarios it
>> > can be pretty unintuitive and be perceived as an anti-feature.  For e.g
>> say
>> > a mobile client buffered up some messages because the device was offline
>> > (maybe in a plane).. and then sent the message after say 23 hours on a
>> > plane.  The message shows up in a Kafka topic with 24 hour retention..
>> and
>> > now the message gets deleted in 1 hour.
>> >
>> > Kartik
>> >
>> >
>> > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:
>> >
>> > > Here's my basic take:
>> > > - I agree it would be nice to have a notion of time baked in if it
>> were
>> > > done right
>> > > - All the proposals so far seem pretty complex--I think they might
>> make
>> > > things worse rather than better overall
>> > > - I think adding 2x8 byte timestamps to the message is probably a
>> > > non-starter from a size perspective
>> > > - Even if it isn't in the message, having two notions of time that
>> > control
>> > > different things is a bit confusing
>> > > - The mechanics of basing retention etc on log append time when that's
>> > not
>> > > in the log seem complicated
>> > >
>> > > To that end here is a possible 4th option. Let me know 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-25 Thread Kartik Paramasivam
This thread has gone rather long.  So I might not be fully clear on
everybody's positions.

To make sure I understand the possible proposals.  Here is a concrete and
'must have' scenario for LinkedIn.

We want to bootstrap from say a Database dump (espresso) into a kafka
topic.  (Databus bootstrap)
What should be the creationTime that we stamp when we send into the Kafka
topic ?
Would we use the time at which the database record was touched (event time)
? or should it just be the local system time when we are publishing into
the Kafka bootstrap topic.

If it is the former (original database update time), then if I understand
the proposal then we would set the max.append.delay to be large and allow
the message to be produced into Kafka.  However this would mean that older
events could possibly (in the worst case) be cleaned up as soon as they are
published ?  So this option wouldn't work.

If it is the later (when we publish into kafka.. we just stamp the current
local time ), then that would work.. it is however as good as stamping it
on the broker..   In this world the "event time" will be in the avro
payload.. in case a downstream application wants to know when the event
actually happened.



On Thu, Oct 22, 2015 at 11:02 AM, Jay Kreps  wrote:

> Hey Kartik,
>
> Yes, I agree exactly with your characterization.
>
> The question is "what is the meaning of retention?" It could mean either:
> 1. "Retain data that is no more than 7 days old"
> 2. "Retain data for 7 days from when you get it"
>
> I don't know if either is actually a clear winner.
>
> Each is intuitive and easily expressible:
> 1. "We have the last 7 days of data in Kafka"
> 2. "You have 7 days to get your data from Kafka from whenever it arrives"
>
> Each has corner cases:
> 1. May lead to retaining data for too little time in the bootstrap case
> 2. May lead to over-retention in the bootstrap case
>
> Which failure is worse probably depends on who you ask:
> 1. May lead to not retaining data long enough for a consumer to get it,
> which the consumer would say is very bad
> 2. May lead to running out of disk space, which ops would say is very bad
>
> -Jay
>
>
>
>
> On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
> kparamasi...@linkedin.com.invalid> wrote:
>
> > Joel or Becket will probably respond back in more detail.. but here are
> my
> > 2c.
> >
> > From the standpoint of LinkedIN, the suggested proposal works.. in
> essence
> > max.appenddelay can be used to turn "creationTime" into "logAppendTime".
> >this does mean that at LinkedIn we won't be able to use
> "creationTime"..
> > however that might also be fine because we anyways use the timeStamp that
> > is set inside the avro payload.
> >
> > Keeping LI aside though, it looks like there are two distinct possible
> > goals.
> > 1. The broker will retain messages for x days after a message shows up at
> > the broker.   This behavior would super deterministic and would never
> > change depending on the contents of the message or anything else.
> >
> > 2. The client is in "partial" control of how long a message stays in the
> > broker based on the creationTime stamped by the client.
> >
> > Although (2) could be a feature in some scenarios..but in many scenarios
> it
> > can be pretty unintuitive and be perceived as an anti-feature.  For e.g
> say
> > a mobile client buffered up some messages because the device was offline
> > (maybe in a plane).. and then sent the message after say 23 hours on a
> > plane.  The message shows up in a Kafka topic with 24 hour retention..
> and
> > now the message gets deleted in 1 hour.
> >
> > Kartik
> >
> >
> > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:
> >
> > > Here's my basic take:
> > > - I agree it would be nice to have a notion of time baked in if it were
> > > done right
> > > - All the proposals so far seem pretty complex--I think they might make
> > > things worse rather than better overall
> > > - I think adding 2x8 byte timestamps to the message is probably a
> > > non-starter from a size perspective
> > > - Even if it isn't in the message, having two notions of time that
> > control
> > > different things is a bit confusing
> > > - The mechanics of basing retention etc on log append time when that's
> > not
> > > in the log seem complicated
> > >
> > > To that end here is a possible 4th option. Let me know what you think.
> > >
> > > The basic idea is that the message creation time is closest to what the
> > > user actually cares about but is dangerous if set wrong. So rather than
> > > substitute another notion of time, let's try to ensure the correctness
> of
> > > message creation time by preventing arbitrarily bad message creation
> > times.
> > >
> > > First, let's see if we can agree that log append time is not something
> > > anyone really cares about but rather an implementation detail. The
> > > timestamp that matters to the user is when the message occurred (the
> > > creation time). The log append time is basicall

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-22 Thread Jay Kreps
Hey Joel,

I think we're mostly in agreement. I don't know about you but I'm not
head-over-heals in love with any of these proposals, including mine--time
is just pretty complicated and icky.

With respect to using log append time here was basically how I felt:
1. I think in a vacuum basing offset lookup on append time is worse than
using create time. If I'm looking for messages that happened at 12:00, I
don't really care when they were mirrored to a particular server. This kind
of time based lookup is definitely an important aspect.
2. I think in a vacuum basing retention off create time is not as good as
basing it off log append time, though there are drawbacks to both as I
outlined in the other email.
3. I think having multiple notions of time isn't ideal. Offset is already
one notion of time which has super nice properties. I get adding another
but adding two more isn't great.
4. I don't particularly care about log rolling as I think the primary
motivation for it is removed by the time-based index (as I described in the
other email).
5. My concern with prop log append time was primarily that there was kind
of a lot of mechanism and adding that stuff to the fetch request just
seemed a bit complicated and a somewhat hacky. My concern is more an unease
that this would complicate life going forward and have more issues than we
would anticipate rather than any concrete problem.

As you point out, and as I tried to summarize, for bootstrapping the
tradeoff is between over-retention and under-retention of data.

-Jay


On Tue, Oct 20, 2015 at 12:09 AM, Joel Koshy  wrote:

>  I’m in favor of adding the create-time in the message (although some would
> argue even that should really be an application-level header), but I don’t
> think it should be mutable after it leaves the client and I think we should
> avoid having the server use that for any server-side indexing. The
> max.append.delay config helps, but I wouldn’t be surprised if it ends up
> becoming a very confusing configuration.
>
> I also agree with being thrifty with headers and that having both
> create-time and log-append-time in the header is overkill which is what the
> third option addresses. I’m not fully convinced that the implementation
> details of basing retention on log append time (i.e., the third option) are
> terribly complicated. Can you describe your concerns with the earlier
> approach? While it is true that using log-append-time to drive retention
> won’t handle the bootstrap case, it is a clear approach in that it makes no
> promises about that scenario - i.e., retention/rolling/offset lookup will
> all be based on arrival time at the server and not message creation
> time. There are use-cases which don’t care about log append time, but for
> those use-cases the create time (if we add it) will be available in each
> message. It’s just that retention/rolling will be driven off log append
> time.
>
> Becket has already brought out some scenarios where the usage of
> create-time in combination with max.append.delay may be ambiguous and
> unintuitive. Here are some others: say if we use the
> create-time-driven-index for retention; if a new segment gets created with
> time t1 and a message arrives out of create-time order (t0 < t1). Then the
> second message will be held hostage until t1 + retention so retention is
> violated. I actually may be completely unclear on how retention should work
> with create-time-driven-indexes and max.append.delay. Say we set
> max.append.delay to something high (or infinity). Wouldn’t the user have to
> set retention appropriately as well? Otherwise really old (by create-time)
> messages that are say from a bootstrapping source would just get purged
> shortly after arrival. So if max.append.delay is infinity it seems the
> right retention setting is also infinity. Can you clarify how retention
> should work if driven off an index that is built from create-time?
>
> Also wrt the max.append.delay - allowing the server to override it would
> render the create-time field pretty much untrustworthy right? The most
> intuitive policy I can think of for create time is to make it immutable
> after it has been set at the sender. Otherwise we would need to add some
> dirty flag or a generation field in order to be able to distinguish between
> messages with the true create-time and the ones that are not - but that
> seems to be a hack that suggests it should be immutable in the first place.
>
> Thanks,
>
> Joel
>
> On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
> kparamasi...@linkedin.com.invalid> wrote:
>
> > Joel or Becket will probably respond back in more detail.. but here are
> my
> > 2c.
> >
> > From the standpoint of LinkedIN, the suggested proposal works.. in
> essence
> > max.appenddelay can be used to turn "creationTime" into "logAppendTime".
> >this does mean that at LinkedIn we won't be able to use
> "creationTime"..
> > however that might also be fine because we anyways use the timeStamp tha

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-22 Thread Jay Kreps
Hey Kartik,

Yes, I agree exactly with your characterization.

The question is "what is the meaning of retention?" It could mean either:
1. "Retain data that is no more than 7 days old"
2. "Retain data for 7 days from when you get it"

I don't know if either is actually a clear winner.

Each is intuitive and easily expressible:
1. "We have the last 7 days of data in Kafka"
2. "You have 7 days to get your data from Kafka from whenever it arrives"

Each has corner cases:
1. May lead to retaining data for too little time in the bootstrap case
2. May lead to over-retention in the bootstrap case

Which failure is worse probably depends on who you ask:
1. May lead to not retaining data long enough for a consumer to get it,
which the consumer would say is very bad
2. May lead to running out of disk space, which ops would say is very bad

-Jay




On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
kparamasi...@linkedin.com.invalid> wrote:

> Joel or Becket will probably respond back in more detail.. but here are my
> 2c.
>
> From the standpoint of LinkedIN, the suggested proposal works.. in essence
> max.appenddelay can be used to turn "creationTime" into "logAppendTime".
>this does mean that at LinkedIn we won't be able to use "creationTime"..
> however that might also be fine because we anyways use the timeStamp that
> is set inside the avro payload.
>
> Keeping LI aside though, it looks like there are two distinct possible
> goals.
> 1. The broker will retain messages for x days after a message shows up at
> the broker.   This behavior would super deterministic and would never
> change depending on the contents of the message or anything else.
>
> 2. The client is in "partial" control of how long a message stays in the
> broker based on the creationTime stamped by the client.
>
> Although (2) could be a feature in some scenarios..but in many scenarios it
> can be pretty unintuitive and be perceived as an anti-feature.  For e.g say
> a mobile client buffered up some messages because the device was offline
> (maybe in a plane).. and then sent the message after say 23 hours on a
> plane.  The message shows up in a Kafka topic with 24 hour retention.. and
> now the message gets deleted in 1 hour.
>
> Kartik
>
>
> On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:
>
> > Here's my basic take:
> > - I agree it would be nice to have a notion of time baked in if it were
> > done right
> > - All the proposals so far seem pretty complex--I think they might make
> > things worse rather than better overall
> > - I think adding 2x8 byte timestamps to the message is probably a
> > non-starter from a size perspective
> > - Even if it isn't in the message, having two notions of time that
> control
> > different things is a bit confusing
> > - The mechanics of basing retention etc on log append time when that's
> not
> > in the log seem complicated
> >
> > To that end here is a possible 4th option. Let me know what you think.
> >
> > The basic idea is that the message creation time is closest to what the
> > user actually cares about but is dangerous if set wrong. So rather than
> > substitute another notion of time, let's try to ensure the correctness of
> > message creation time by preventing arbitrarily bad message creation
> times.
> >
> > First, let's see if we can agree that log append time is not something
> > anyone really cares about but rather an implementation detail. The
> > timestamp that matters to the user is when the message occurred (the
> > creation time). The log append time is basically just an approximation to
> > this on the assumption that the message creation and the message receive
> on
> > the server occur pretty close together and the reason to prefer .
> >
> > But as these values diverge the issue starts to become apparent. Say you
> > set the retention to one week and then mirror data from a topic
> containing
> > two years of retention. Your intention is clearly to keep the last week,
> > but because the mirroring is appending right now you will keep two years.
> >
> > The reason we are liking log append time is because we are (justifiably)
> > concerned that in certain situations the creation time may not be
> > trustworthy. This same problem exists on the servers but there are fewer
> > servers and they just run the kafka code so it is less of an issue.
> >
> > There are two possible ways to handle this:
> >
> >1. Just tell people to add size based retention. I think this is not
> >entirely unreasonable, we're basically saying we retain data based on
> > the
> >timestamp you give us in the data. If you give us bad data we will
> > retain
> >it for a bad amount of time. If you want to ensure we don't retain
> "too
> >much" data, define "too much" by setting a time-based retention
> setting.
> >This is not entirely unreasonable but kind of suffers from a "one bad
> >apple" problem in a very large environment.
> >2. Prevent bad timestamps. In general we can't 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-22 Thread Jay Kreps
Hey Becket,

Let me see if I can address your concerns:

1. Let's say we have two source clusters that are mirrored to the same
> target cluster. For some reason one of the mirror maker from a cluster dies
> and after fix the issue we want to resume mirroring. In this case it is
> possible that when the mirror maker resumes mirroring, the timestamp of the
> messages have already gone beyond the acceptable timestamp range on broker.
> In order to let those messages go through, we have to bump up the
> *max.append.delay
> *for all the topics on the target broker. This could be painful.


Actually what I was suggesting was different. Here is my observation:
clusters/topics directly produced to by applications have a valid assertion
that log append time and create time are similar (let's call these
"unbuffered"); other cluster/topic such as those that receive data from a
database, a log file, or another kafka cluster don't have that assertion,
for these "buffered" clusters data can be arbitrarily late. This means any
use of log append time on these buffered clusters is not very meaningful,
and create time and log append time "should" be similar on unbuffered
clusters so you can probably use either.

Using log append time on buffered clusters actually results in bad things.
If you request the offset for a given time you get don't end up getting
data for that time but rather data that showed up at that time. If you try
to retain 7 days of data it may mostly work but any kind of bootstrapping
will result in retaining much more (potentially the whole database
contents!).

So what I am suggesting in terms of the use of the max.append.delay is that
unbuffered clusters would have this set and buffered clusters would not. In
other words, in LI terminology, tracking and metrics clusters would have
this enforced, aggregate and replica clusters wouldn't.

So you DO have the issue of potentially maintaining more data than you need
to on aggregate clusters if your mirroring skews, but you DON'T need to
tweak the setting as you described.

2. Let's say in the above scenario we let the messages in, at that point
> some log segments in the target cluster might have a wide range of
> timestamps, like Guozhang mentioned the log rolling could be tricky because
> the first time index entry does not necessarily have the smallest timestamp
> of all the messages in the log segment. Instead, it is the largest
> timestamp ever seen. We have to scan the entire log to find the message
> with smallest offset to see if we should roll.


I think there are two uses for time-based log rolling:
1. Making the offset lookup by timestamp work
2. Ensuring we don't retain data indefinitely if it is supposed to get
purged after 7 days

But think about these two use cases. (1) is totally obviated by the
time=>offset index we are adding which yields much more granular offset
lookups. (2) Is actually totally broken if you switch to append time,
right? If you want to be sure for security/privacy reasons you only retain
7 days of data then if the log append and create time diverge you actually
violate this requirement.

I think 95% of people care about (1) which is solved in the proposal and
(2) is actually broken today as well as in both proposals.

3. Theoretically it is possible that an older log segment contains
> timestamps that are older than all the messages in a newer log segment. It
> would be weird that we are supposed to delete the newer log segment before
> we delete the older log segment.


The index timestamps would always be a lower bound (i.e. the maximum at
that time) so I don't think that is possible.

 4. In bootstrap case, if we reload the data to a Kafka cluster, we have to
> make sure we configure the topic correctly before we load the data.
> Otherwise the message might either be rejected because the timestamp is too
> old, or it might be deleted immediately because the retention time has
> reached.


See (1).

-Jay

On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin 
wrote:

> Hey Jay and Guozhang,
>
> Thanks a lot for the reply. So if I understand correctly, Jay's proposal
> is:
>
> 1. Let client stamp the message create time.
> 2. Broker build index based on client-stamped message create time.
> 3. Broker only takes message whose create time is withing current time
> plus/minus T (T is a configuration *max.append.delay*, could be topic level
> configuration), if the timestamp is out of this range, broker rejects the
> message.
> 4. Because the create time of messages can be out of order, when broker
> builds the time based index it only provides the guarantee that if a
> consumer starts consuming from the offset returned by searching by
> timestamp t, they will not miss any message created after t, but might see
> some messages created before t.
>
> To build the time based index, every time when a broker needs to insert a
> new time index entry, the entry would be {Largest_Timestamp_Ever_Seen ->
> Current_Offset}. This basically mea

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-20 Thread Joel Koshy
 I’m in favor of adding the create-time in the message (although some would
argue even that should really be an application-level header), but I don’t
think it should be mutable after it leaves the client and I think we should
avoid having the server use that for any server-side indexing. The
max.append.delay config helps, but I wouldn’t be surprised if it ends up
becoming a very confusing configuration.

I also agree with being thrifty with headers and that having both
create-time and log-append-time in the header is overkill which is what the
third option addresses. I’m not fully convinced that the implementation
details of basing retention on log append time (i.e., the third option) are
terribly complicated. Can you describe your concerns with the earlier
approach? While it is true that using log-append-time to drive retention
won’t handle the bootstrap case, it is a clear approach in that it makes no
promises about that scenario - i.e., retention/rolling/offset lookup will
all be based on arrival time at the server and not message creation
time. There are use-cases which don’t care about log append time, but for
those use-cases the create time (if we add it) will be available in each
message. It’s just that retention/rolling will be driven off log append
time.

Becket has already brought out some scenarios where the usage of
create-time in combination with max.append.delay may be ambiguous and
unintuitive. Here are some others: say if we use the
create-time-driven-index for retention; if a new segment gets created with
time t1 and a message arrives out of create-time order (t0 < t1). Then the
second message will be held hostage until t1 + retention so retention is
violated. I actually may be completely unclear on how retention should work
with create-time-driven-indexes and max.append.delay. Say we set
max.append.delay to something high (or infinity). Wouldn’t the user have to
set retention appropriately as well? Otherwise really old (by create-time)
messages that are say from a bootstrapping source would just get purged
shortly after arrival. So if max.append.delay is infinity it seems the
right retention setting is also infinity. Can you clarify how retention
should work if driven off an index that is built from create-time?

Also wrt the max.append.delay - allowing the server to override it would
render the create-time field pretty much untrustworthy right? The most
intuitive policy I can think of for create time is to make it immutable
after it has been set at the sender. Otherwise we would need to add some
dirty flag or a generation field in order to be able to distinguish between
messages with the true create-time and the ones that are not - but that
seems to be a hack that suggests it should be immutable in the first place.

Thanks,

Joel

On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
kparamasi...@linkedin.com.invalid> wrote:

> Joel or Becket will probably respond back in more detail.. but here are my
> 2c.
>
> From the standpoint of LinkedIN, the suggested proposal works.. in essence
> max.appenddelay can be used to turn "creationTime" into "logAppendTime".
>this does mean that at LinkedIn we won't be able to use "creationTime"..
> however that might also be fine because we anyways use the timeStamp that
> is set inside the avro payload.
>
> Keeping LI aside though, it looks like there are two distinct possible
> goals.
> 1. The broker will retain messages for x days after a message shows up at
> the broker.   This behavior would super deterministic and would never
> change depending on the contents of the message or anything else.
>
> 2. The client is in "partial" control of how long a message stays in the
> broker based on the creationTime stamped by the client.
>
> Although (2) could be a feature in some scenarios..but in many scenarios it
> can be pretty unintuitive and be perceived as an anti-feature.  For e.g say
> a mobile client buffered up some messages because the device was offline
> (maybe in a plane).. and then sent the message after say 23 hours on a
> plane.  The message shows up in a Kafka topic with 24 hour retention.. and
> now the message gets deleted in 1 hour.
>
> Kartik
>
>
> On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:
>
> > Here's my basic take:
> > - I agree it would be nice to have a notion of time baked in if it were
> > done right
> > - All the proposals so far seem pretty complex--I think they might make
> > things worse rather than better overall
> > - I think adding 2x8 byte timestamps to the message is probably a
> > non-starter from a size perspective
> > - Even if it isn't in the message, having two notions of time that
> control
> > different things is a bit confusing
> > - The mechanics of basing retention etc on log append time when that's
> not
> > in the log seem complicated
> >
> > To that end here is a possible 4th option. Let me know what you think.
> >
> > The basic idea is that the message creation time is closest to what the
> > 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-20 Thread Kartik Paramasivam
Joel or Becket will probably respond back in more detail.. but here are my
2c.

>From the standpoint of LinkedIN, the suggested proposal works.. in essence
max.appenddelay can be used to turn "creationTime" into "logAppendTime".
   this does mean that at LinkedIn we won't be able to use "creationTime"..
however that might also be fine because we anyways use the timeStamp that
is set inside the avro payload.

Keeping LI aside though, it looks like there are two distinct possible
goals.
1. The broker will retain messages for x days after a message shows up at
the broker.   This behavior would super deterministic and would never
change depending on the contents of the message or anything else.

2. The client is in "partial" control of how long a message stays in the
broker based on the creationTime stamped by the client.

Although (2) could be a feature in some scenarios..but in many scenarios it
can be pretty unintuitive and be perceived as an anti-feature.  For e.g say
a mobile client buffered up some messages because the device was offline
(maybe in a plane).. and then sent the message after say 23 hours on a
plane.  The message shows up in a Kafka topic with 24 hour retention.. and
now the message gets deleted in 1 hour.

Kartik


On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:

> Here's my basic take:
> - I agree it would be nice to have a notion of time baked in if it were
> done right
> - All the proposals so far seem pretty complex--I think they might make
> things worse rather than better overall
> - I think adding 2x8 byte timestamps to the message is probably a
> non-starter from a size perspective
> - Even if it isn't in the message, having two notions of time that control
> different things is a bit confusing
> - The mechanics of basing retention etc on log append time when that's not
> in the log seem complicated
>
> To that end here is a possible 4th option. Let me know what you think.
>
> The basic idea is that the message creation time is closest to what the
> user actually cares about but is dangerous if set wrong. So rather than
> substitute another notion of time, let's try to ensure the correctness of
> message creation time by preventing arbitrarily bad message creation times.
>
> First, let's see if we can agree that log append time is not something
> anyone really cares about but rather an implementation detail. The
> timestamp that matters to the user is when the message occurred (the
> creation time). The log append time is basically just an approximation to
> this on the assumption that the message creation and the message receive on
> the server occur pretty close together and the reason to prefer .
>
> But as these values diverge the issue starts to become apparent. Say you
> set the retention to one week and then mirror data from a topic containing
> two years of retention. Your intention is clearly to keep the last week,
> but because the mirroring is appending right now you will keep two years.
>
> The reason we are liking log append time is because we are (justifiably)
> concerned that in certain situations the creation time may not be
> trustworthy. This same problem exists on the servers but there are fewer
> servers and they just run the kafka code so it is less of an issue.
>
> There are two possible ways to handle this:
>
>1. Just tell people to add size based retention. I think this is not
>entirely unreasonable, we're basically saying we retain data based on
> the
>timestamp you give us in the data. If you give us bad data we will
> retain
>it for a bad amount of time. If you want to ensure we don't retain "too
>much" data, define "too much" by setting a time-based retention setting.
>This is not entirely unreasonable but kind of suffers from a "one bad
>apple" problem in a very large environment.
>2. Prevent bad timestamps. In general we can't say a timestamp is bad.
>However the definition we're implicitly using is that we think there
> are a
>set of topics/clusters where the creation timestamp should always be
> "very
>close" to the log append timestamp. This is true for data sources that
> have
>no buffering capability (which at LinkedIn is very common, but is more
> rare
>elsewhere). The solution in this case would be to allow a setting along
> the
>lines of max.append.delay which checks the creation timestamp against
> the
>server time to look for too large a divergence. The solution would
> either
>be to reject the message or to override it with the server time.
>
> So in LI's environment you would configure the clusters used for direct,
> unbuffered, message production (e.g. tracking and metrics local) to enforce
> a reasonably aggressive timestamp bound (say 10 mins), and all other
> clusters would just inherent these.
>
> The downside of this approach is requiring the special configuration.
> However I think in 99% of environments this could be skipped entirely, it's
> only when the ratio of c

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-13 Thread Jiangjie Qin
Hey Jay and Guozhang,

Thanks a lot for the reply. So if I understand correctly, Jay's proposal is:

1. Let client stamp the message create time.
2. Broker build index based on client-stamped message create time.
3. Broker only takes message whose create time is withing current time
plus/minus T (T is a configuration *max.append.delay*, could be topic level
configuration), if the timestamp is out of this range, broker rejects the
message.
4. Because the create time of messages can be out of order, when broker
builds the time based index it only provides the guarantee that if a
consumer starts consuming from the offset returned by searching by
timestamp t, they will not miss any message created after t, but might see
some messages created before t.

To build the time based index, every time when a broker needs to insert a
new time index entry, the entry would be {Largest_Timestamp_Ever_Seen ->
Current_Offset}. This basically means any timestamp larger than the
Largest_Timestamp_Ever_Seen must come after this offset because it never
saw them before. So we don't miss any message with larger timestamp.

(@Guozhang, in this case, for log retention we only need to take a look at
the last time index entry, because it must be the largest timestamp ever,
if that timestamp is overdue, we can safely delete any log segment before
that. So we don't need to scan the log segment file for log retention)

I assume that we are still going to have the new FetchRequest to allow the
time index replication for replicas.

I think Jay's main point here is that we don't want to have two timestamp
concepts in Kafka, which I agree is a reasonable concern. And I also agree
that create time is more meaningful than LogAppendTime for users. But I am
not sure if making everything base on Create Time would work in all cases.
Here are my questions about this approach:

1. Let's say we have two source clusters that are mirrored to the same
target cluster. For some reason one of the mirror maker from a cluster dies
and after fix the issue we want to resume mirroring. In this case it is
possible that when the mirror maker resumes mirroring, the timestamp of the
messages have already gone beyond the acceptable timestamp range on broker.
In order to let those messages go through, we have to bump up the
*max.append.delay
*for all the topics on the target broker. This could be painful.

2. Let's say in the above scenario we let the messages in, at that point
some log segments in the target cluster might have a wide range of
timestamps, like Guozhang mentioned the log rolling could be tricky because
the first time index entry does not necessarily have the smallest timestamp
of all the messages in the log segment. Instead, it is the largest
timestamp ever seen. We have to scan the entire log to find the message
with smallest offset to see if we should roll.

3. Theoretically it is possible that an older log segment contains
timestamps that are older than all the messages in a newer log segment. It
would be weird that we are supposed to delete the newer log segment before
we delete the older log segment.

4. In bootstrap case, if we reload the data to a Kafka cluster, we have to
make sure we configure the topic correctly before we load the data.
Otherwise the message might either be rejected because the timestamp is too
old, or it might be deleted immediately because the retention time has
reached.

I am very concerned about the operational overhead and the ambiguity of
guarantees we introduce if we purely rely on CreateTime.

It looks to me that the biggest issue of adopting CreateTime everywhere is
CreateTime can have big gaps. These gaps could be caused by several cases:
[1]. Faulty clients
[2]. Natural delays from different source
[3]. Bootstrap
[4]. Failure recovery

Jay's alternative proposal solves [1], perhaps solve [2] as well if we are
able to set a reasonable max.append.delay. But it does not seem address [3]
and [4]. I actually doubt if [3] and [4] are solvable because it looks the
CreateTime gap is unavoidable in those two cases.

Thanks,

Jiangjie (Becket) Qin


On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang  wrote:

> Just to complete Jay's option, here is my understanding:
>
> 1. For log retention: if we want to remove data before time t, we look into
> the index file of each segment and find the largest timestamp t' < t, find
> the corresponding timestamp and start scanning to the end of the segment,
> if there is no entry with timestamp >= t, we can delete this segment; if a
> segment's index smallest timestamp is larger than t, we can skip that
> segment.
>
> 2. For log rolling: if we want to start a new segment after time t, we look
> into the active segment's index file, if the largest timestamp is already >
> t, we can roll a new segment immediately; if it is < t, we read its
> corresponding offset and start scanning to the end of the segment, if we
> find a record whose timestamp > t, we can roll a new segment.
>
> For log rol

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-13 Thread Guozhang Wang
Just to complete Jay's option, here is my understanding:

1. For log retention: if we want to remove data before time t, we look into
the index file of each segment and find the largest timestamp t' < t, find
the corresponding timestamp and start scanning to the end of the segment,
if there is no entry with timestamp >= t, we can delete this segment; if a
segment's index smallest timestamp is larger than t, we can skip that
segment.

2. For log rolling: if we want to start a new segment after time t, we look
into the active segment's index file, if the largest timestamp is already >
t, we can roll a new segment immediately; if it is < t, we read its
corresponding offset and start scanning to the end of the segment, if we
find a record whose timestamp > t, we can roll a new segment.

For log rolling we only need to possibly scan a small portion the active
segment, which should be fine; for log retention we may in the worst case
end up scanning all segments, but in practice we may skip most of them
since their smallest timestamp in the index file is larger than t.

Guozhang


On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps  wrote:

> I think it should be possible to index out-of-order timestamps. The
> timestamp index would be similar to the offset index, a memory mapped file
> appended to as part of the log append, but would have the format
>   timestamp offset
> The timestamp entries would be monotonic and as with the offset index would
> be no more often then every 4k (or some configurable threshold to keep the
> index small--actually for timestamp it could probably be much more sparse
> than 4k).
>
> A search for a timestamp t yields an offset o before which no prior message
> has a timestamp >= t. In other words if you read the log starting with o
> you are guaranteed not to miss any messages occurring at t or later though
> you may get many before t (due to out-of-orderness). Unlike the offset
> index this bound doesn't really have to be tight (i.e. probably no need to
> go search the log itself, though you could).
>
> -Jay
>
> On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:
>
> > Here's my basic take:
> > - I agree it would be nice to have a notion of time baked in if it were
> > done right
> > - All the proposals so far seem pretty complex--I think they might make
> > things worse rather than better overall
> > - I think adding 2x8 byte timestamps to the message is probably a
> > non-starter from a size perspective
> > - Even if it isn't in the message, having two notions of time that
> control
> > different things is a bit confusing
> > - The mechanics of basing retention etc on log append time when that's
> not
> > in the log seem complicated
> >
> > To that end here is a possible 4th option. Let me know what you think.
> >
> > The basic idea is that the message creation time is closest to what the
> > user actually cares about but is dangerous if set wrong. So rather than
> > substitute another notion of time, let's try to ensure the correctness of
> > message creation time by preventing arbitrarily bad message creation
> times.
> >
> > First, let's see if we can agree that log append time is not something
> > anyone really cares about but rather an implementation detail. The
> > timestamp that matters to the user is when the message occurred (the
> > creation time). The log append time is basically just an approximation to
> > this on the assumption that the message creation and the message receive
> on
> > the server occur pretty close together and the reason to prefer .
> >
> > But as these values diverge the issue starts to become apparent. Say you
> > set the retention to one week and then mirror data from a topic
> containing
> > two years of retention. Your intention is clearly to keep the last week,
> > but because the mirroring is appending right now you will keep two years.
> >
> > The reason we are liking log append time is because we are (justifiably)
> > concerned that in certain situations the creation time may not be
> > trustworthy. This same problem exists on the servers but there are fewer
> > servers and they just run the kafka code so it is less of an issue.
> >
> > There are two possible ways to handle this:
> >
> >1. Just tell people to add size based retention. I think this is not
> >entirely unreasonable, we're basically saying we retain data based on
> the
> >timestamp you give us in the data. If you give us bad data we will
> retain
> >it for a bad amount of time. If you want to ensure we don't retain
> "too
> >much" data, define "too much" by setting a time-based retention
> setting.
> >This is not entirely unreasonable but kind of suffers from a "one bad
> >apple" problem in a very large environment.
> >2. Prevent bad timestamps. In general we can't say a timestamp is bad.
> >However the definition we're implicitly using is that we think there
> are a
> >set of topics/clusters where the creation timestamp should always be
> "very

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-13 Thread Jay Kreps
I think it should be possible to index out-of-order timestamps. The
timestamp index would be similar to the offset index, a memory mapped file
appended to as part of the log append, but would have the format
  timestamp offset
The timestamp entries would be monotonic and as with the offset index would
be no more often then every 4k (or some configurable threshold to keep the
index small--actually for timestamp it could probably be much more sparse
than 4k).

A search for a timestamp t yields an offset o before which no prior message
has a timestamp >= t. In other words if you read the log starting with o
you are guaranteed not to miss any messages occurring at t or later though
you may get many before t (due to out-of-orderness). Unlike the offset
index this bound doesn't really have to be tight (i.e. probably no need to
go search the log itself, though you could).

-Jay

On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps  wrote:

> Here's my basic take:
> - I agree it would be nice to have a notion of time baked in if it were
> done right
> - All the proposals so far seem pretty complex--I think they might make
> things worse rather than better overall
> - I think adding 2x8 byte timestamps to the message is probably a
> non-starter from a size perspective
> - Even if it isn't in the message, having two notions of time that control
> different things is a bit confusing
> - The mechanics of basing retention etc on log append time when that's not
> in the log seem complicated
>
> To that end here is a possible 4th option. Let me know what you think.
>
> The basic idea is that the message creation time is closest to what the
> user actually cares about but is dangerous if set wrong. So rather than
> substitute another notion of time, let's try to ensure the correctness of
> message creation time by preventing arbitrarily bad message creation times.
>
> First, let's see if we can agree that log append time is not something
> anyone really cares about but rather an implementation detail. The
> timestamp that matters to the user is when the message occurred (the
> creation time). The log append time is basically just an approximation to
> this on the assumption that the message creation and the message receive on
> the server occur pretty close together and the reason to prefer .
>
> But as these values diverge the issue starts to become apparent. Say you
> set the retention to one week and then mirror data from a topic containing
> two years of retention. Your intention is clearly to keep the last week,
> but because the mirroring is appending right now you will keep two years.
>
> The reason we are liking log append time is because we are (justifiably)
> concerned that in certain situations the creation time may not be
> trustworthy. This same problem exists on the servers but there are fewer
> servers and they just run the kafka code so it is less of an issue.
>
> There are two possible ways to handle this:
>
>1. Just tell people to add size based retention. I think this is not
>entirely unreasonable, we're basically saying we retain data based on the
>timestamp you give us in the data. If you give us bad data we will retain
>it for a bad amount of time. If you want to ensure we don't retain "too
>much" data, define "too much" by setting a time-based retention setting.
>This is not entirely unreasonable but kind of suffers from a "one bad
>apple" problem in a very large environment.
>2. Prevent bad timestamps. In general we can't say a timestamp is bad.
>However the definition we're implicitly using is that we think there are a
>set of topics/clusters where the creation timestamp should always be "very
>close" to the log append timestamp. This is true for data sources that have
>no buffering capability (which at LinkedIn is very common, but is more rare
>elsewhere). The solution in this case would be to allow a setting along the
>lines of max.append.delay which checks the creation timestamp against the
>server time to look for too large a divergence. The solution would either
>be to reject the message or to override it with the server time.
>
> So in LI's environment you would configure the clusters used for direct,
> unbuffered, message production (e.g. tracking and metrics local) to enforce
> a reasonably aggressive timestamp bound (say 10 mins), and all other
> clusters would just inherent these.
>
> The downside of this approach is requiring the special configuration.
> However I think in 99% of environments this could be skipped entirely, it's
> only when the ratio of clients to servers gets so massive that you need to
> do this. The primary upside is that you have a single authoritative notion
> of time which is closest to what a user would want and is stored directly
> in the message.
>
> I'm also assuming there is a workable approach for indexing non-monotonic
> timestamps, though I haven't actually worked that out.
>
> -Jay
>
> On Mon, Oct 5, 

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-13 Thread Jay Kreps
Here's my basic take:
- I agree it would be nice to have a notion of time baked in if it were
done right
- All the proposals so far seem pretty complex--I think they might make
things worse rather than better overall
- I think adding 2x8 byte timestamps to the message is probably a
non-starter from a size perspective
- Even if it isn't in the message, having two notions of time that control
different things is a bit confusing
- The mechanics of basing retention etc on log append time when that's not
in the log seem complicated

To that end here is a possible 4th option. Let me know what you think.

The basic idea is that the message creation time is closest to what the
user actually cares about but is dangerous if set wrong. So rather than
substitute another notion of time, let's try to ensure the correctness of
message creation time by preventing arbitrarily bad message creation times.

First, let's see if we can agree that log append time is not something
anyone really cares about but rather an implementation detail. The
timestamp that matters to the user is when the message occurred (the
creation time). The log append time is basically just an approximation to
this on the assumption that the message creation and the message receive on
the server occur pretty close together and the reason to prefer .

But as these values diverge the issue starts to become apparent. Say you
set the retention to one week and then mirror data from a topic containing
two years of retention. Your intention is clearly to keep the last week,
but because the mirroring is appending right now you will keep two years.

The reason we are liking log append time is because we are (justifiably)
concerned that in certain situations the creation time may not be
trustworthy. This same problem exists on the servers but there are fewer
servers and they just run the kafka code so it is less of an issue.

There are two possible ways to handle this:

   1. Just tell people to add size based retention. I think this is not
   entirely unreasonable, we're basically saying we retain data based on the
   timestamp you give us in the data. If you give us bad data we will retain
   it for a bad amount of time. If you want to ensure we don't retain "too
   much" data, define "too much" by setting a time-based retention setting.
   This is not entirely unreasonable but kind of suffers from a "one bad
   apple" problem in a very large environment.
   2. Prevent bad timestamps. In general we can't say a timestamp is bad.
   However the definition we're implicitly using is that we think there are a
   set of topics/clusters where the creation timestamp should always be "very
   close" to the log append timestamp. This is true for data sources that have
   no buffering capability (which at LinkedIn is very common, but is more rare
   elsewhere). The solution in this case would be to allow a setting along the
   lines of max.append.delay which checks the creation timestamp against the
   server time to look for too large a divergence. The solution would either
   be to reject the message or to override it with the server time.

So in LI's environment you would configure the clusters used for direct,
unbuffered, message production (e.g. tracking and metrics local) to enforce
a reasonably aggressive timestamp bound (say 10 mins), and all other
clusters would just inherent these.

The downside of this approach is requiring the special configuration.
However I think in 99% of environments this could be skipped entirely, it's
only when the ratio of clients to servers gets so massive that you need to
do this. The primary upside is that you have a single authoritative notion
of time which is closest to what a user would want and is stored directly
in the message.

I'm also assuming there is a workable approach for indexing non-monotonic
timestamps, though I haven't actually worked that out.

-Jay

On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin 
wrote:

> Bumping up this thread although most of the discussion were on the
> discussion thread of KIP-31 :)
>
> I just updated the KIP page to add detailed solution for the option (Option
> 3) that does not expose the LogAppendTime to user.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
>
> The option has a minor change to the fetch request to allow fetching time
> index entry as well. I kind of like this solution because its just doing
> what we need without introducing other things.
>
> It will be great to see what are the feedback. I can explain more during
> tomorrow's KIP hangout.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin  wrote:
>
> > Hi Jay,
> >
> > I just copy/pastes here your feedback on the timestamp proposal that was
> > in the discussion thread of KIP-31. Please see the replies inline.
> > The main change I made compared with previous proposal is to add both
> > CreateTime and LogAppendTime to the me

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-10-05 Thread Jiangjie Qin
Bumping up this thread although most of the discussion were on the
discussion thread of KIP-31 :)

I just updated the KIP page to add detailed solution for the option (Option
3) that does not expose the LogAppendTime to user.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message

The option has a minor change to the fetch request to allow fetching time
index entry as well. I kind of like this solution because its just doing
what we need without introducing other things.

It will be great to see what are the feedback. I can explain more during
tomorrow's KIP hangout.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin  wrote:

> Hi Jay,
>
> I just copy/pastes here your feedback on the timestamp proposal that was
> in the discussion thread of KIP-31. Please see the replies inline.
> The main change I made compared with previous proposal is to add both
> CreateTime and LogAppendTime to the message.
>
> On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:
>
> > Hey Beckett,
> >
> > I was proposing splitting up the KIP just for simplicity of discussion.
> You
> > can still implement them in one patch. I think otherwise it will be hard
> to
> > discuss/vote on them since if you like the offset proposal but not the
> time
> > proposal what do you do?
> >
> > Introducing a second notion of time into Kafka is a pretty massive
> > philosophical change so it kind of warrants it's own KIP I think it
> isn't
> > just "Change message format".
> >
> > WRT time I think one thing to clarify in the proposal is how MM will have
> > access to set the timestamp? Presumably this will be a new field in
> > ProducerRecord, right? If so then any user can set the timestamp, right?
> > I'm not sure you answered the questions around how this will work for MM
> > since when MM retains timestamps from multiple partitions they will then
> be
> > out of order and in the past (so the max(lastAppendedTimestamp,
> > currentTimeMillis) override you proposed will not work, right?). If we
> > don't do this then when you set up mirroring the data will all be new and
> > you have the same retention problem you described. Maybe I missed
> > something...?
> lastAppendedTimestamp means the timestamp of the message that last
> appended to the log.
> If a broker is a leader, since it will assign the timestamp by itself, the
> lastAppenedTimestamp will be its local clock when append the last message.
> So if there is no leader migration, max(lastAppendedTimestamp,
> currentTimeMillis) = currentTimeMillis.
> If a broker is a follower, because it will keep the leader's timestamp
> unchanged, the lastAppendedTime would be the leader's clock when it appends
> that message message. It keeps track of the lastAppendedTime only in case
> it becomes leader later on. At that point, it is possible that the
> timestamp of the last appended message was stamped by old leader, but the
> new leader's currentTimeMillis < lastAppendedTime. If a new message comes,
> instead of stamp it with new leader's currentTimeMillis, we have to stamp
> it to lastAppendedTime to avoid the timestamp in the log going backward.
> The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the
> broker side clock. If MM produces message with different LogAppendTime in
> source clusters to the same target cluster, the LogAppendTime will be
> ignored  re-stamped by target cluster.
> I added a use case example for mirror maker in KIP-32. Also there is a
> corner case discussion about when we need the max(lastAppendedTime,
> currentTimeMillis) trick. Could you take a look and see if that answers
> your question?
>
> >
> > My main motivation is that given that both Samza and Kafka streams are
> > doing work that implies a mandatory client-defined notion of time, I
> really
> > think introducing a different mandatory notion of time in Kafka is going
> to
> > be quite odd. We should think hard about how client-defined time could
> > work. I'm not sure if it can, but I'm also not sure that it can't. Having
> > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> side?
> I talked with Kartik and realized that it would be useful to have a client
> timestamp to facilitate use cases like stream processing.
> I was trying to figure out if we can simply use client timestamp without
> introducing the server time. There are some discussion in the KIP.
> The key problem we want to solve here is
> 1. We want log retention and rolling to depend on server clock.
> 2. We want to make sure the log-assiciated timestamp to be retained when
> replicas moves.
> 3. We want to use the timestamp in some way that can allow searching by
> timestamp.
> For 1 and 2, an alternative is to pass the log-associated timestamp
> through replication, that means we need to have a different protocol for
> replica fetching to pass log-associated timestamp. It is actually
> complicated and there could be a lot of corner cas

Re: [DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-09-10 Thread Jiangjie Qin
Hi Jay,

I just copy/pastes here your feedback on the timestamp proposal that was in
the discussion thread of KIP-31. Please see the replies inline.
The main change I made compared with previous proposal is to add both
CreateTime and LogAppendTime to the message.

On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps  wrote:

> Hey Beckett,
>
> I was proposing splitting up the KIP just for simplicity of discussion.
You
> can still implement them in one patch. I think otherwise it will be hard
to
> discuss/vote on them since if you like the offset proposal but not the
time
> proposal what do you do?
>
> Introducing a second notion of time into Kafka is a pretty massive
> philosophical change so it kind of warrants it's own KIP I think it isn't
> just "Change message format".
>
> WRT time I think one thing to clarify in the proposal is how MM will have
> access to set the timestamp? Presumably this will be a new field in
> ProducerRecord, right? If so then any user can set the timestamp, right?
> I'm not sure you answered the questions around how this will work for MM
> since when MM retains timestamps from multiple partitions they will then
be
> out of order and in the past (so the max(lastAppendedTimestamp,
> currentTimeMillis) override you proposed will not work, right?). If we
> don't do this then when you set up mirroring the data will all be new and
> you have the same retention problem you described. Maybe I missed
> something...?
lastAppendedTimestamp means the timestamp of the message that last appended
to the log.
If a broker is a leader, since it will assign the timestamp by itself, the
lastAppenedTimestamp will be its local clock when append the last message.
So if there is no leader migration, max(lastAppendedTimestamp,
currentTimeMillis) = currentTimeMillis.
If a broker is a follower, because it will keep the leader's timestamp
unchanged, the lastAppendedTime would be the leader's clock when it appends
that message message. It keeps track of the lastAppendedTime only in case
it becomes leader later on. At that point, it is possible that the
timestamp of the last appended message was stamped by old leader, but the
new leader's currentTimeMillis < lastAppendedTime. If a new message comes,
instead of stamp it with new leader's currentTimeMillis, we have to stamp
it to lastAppendedTime to avoid the timestamp in the log going backward.
The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the
broker side clock. If MM produces message with different LogAppendTime in
source clusters to the same target cluster, the LogAppendTime will be
ignored  re-stamped by target cluster.
I added a use case example for mirror maker in KIP-32. Also there is a
corner case discussion about when we need the max(lastAppendedTime,
currentTimeMillis) trick. Could you take a look and see if that answers
your question?

>
> My main motivation is that given that both Samza and Kafka streams are
> doing work that implies a mandatory client-defined notion of time, I
really
> think introducing a different mandatory notion of time in Kafka is going
to
> be quite odd. We should think hard about how client-defined time could
> work. I'm not sure if it can, but I'm also not sure that it can't. Having
> both will be odd. Did you chat about this with Yi/Kartik on the Samza
side?
I talked with Kartik and realized that it would be useful to have a client
timestamp to facilitate use cases like stream processing.
I was trying to figure out if we can simply use client timestamp without
introducing the server time. There are some discussion in the KIP.
The key problem we want to solve here is
1. We want log retention and rolling to depend on server clock.
2. We want to make sure the log-assiciated timestamp to be retained when
replicas moves.
3. We want to use the timestamp in some way that can allow searching by
timestamp.
For 1 and 2, an alternative is to pass the log-associated timestamp through
replication, that means we need to have a different protocol for replica
fetching to pass log-associated timestamp. It is actually complicated and
there could be a lot of corner cases to handle. e.g. what if an old leader
started to fetch from the new leader, should it also update all of its old
log segment timestamp?
I think actually client side timestamp would be better for 3 if we can find
a way to make it work.
So far I am not able to convince myself that only having client side
timestamp would work mainly because 1 and 2. There are a few situations I
mentioned in the KIP.
>
> When you are saying it won't work you are assuming some particular
> implementation? Maybe that the index is a monotonically increasing set of
> pointers to the least record with a timestamp larger than the index time?
> In other words a search for time X gives the largest offset at which all
> records are <= X?
It is a promising idea. We probably can have an in-memory index like that,
but might be complicated to have a file on disk like that. Imagine there
are two 

[DISCUSS] KIP-32 - Add CreateTime and LogAppendTime to Kafka message

2015-09-10 Thread Jiangjie Qin
Hi folks,

This proposal was previously in KIP-31 and we separated it to KIP-32 per
Neha and Jay's suggestion.

The proposal is to add the following two timestamps to Kafka message.
- CreateTime
- LogAppendTime

The CreateTime will be set by the producer and will change after that. The
LogAppendTime will be set by broker for purpose such as enforce log
retention and log rolling.

Thanks,

Jiangjie (Becket) Qin