Re: KIP-33 Opt out from Time Based indexing

2016-09-08 Thread Jan Filipiak

Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:

Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak 
wrote:


Hi Jun,

sorry for the late reply. Regarding B, my main concern was just complexity
of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother
that much.

Best Jan





On 29.08.2016 19:36, Jun Rao wrote:


Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively,
being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if
you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it
seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
have
given my view on this above. Are there any other things that you think
that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along
in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again
even
with the 7 days. Thanks for pointing out! Id like to see the appendTime
as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want
to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his
downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then 

Re: KIP-33 Opt out from Time Based indexing

2016-09-06 Thread Jun Rao
Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak 
wrote:

> Hi Jun,
>
> sorry for the late reply. Regarding B, my main concern was just complexity
> of understanding what's going on.
> As you can see it took me probably some 2 days or so, to fully grab all
> the details in the implementation and what
> the impacts are. Usually I prefer to turn things I don't use off, so I
> don't have to bother. Log Append time will work for me.
>
> Rolling logs was my main concern. The producer can specify the timestamp
> and we use epoch inside the message, I'd bet money,
> people in the company would have put this epoch also in the produce
> record. => rollings logs as the broker thinks its millis.
> So that would probably have caused us at least one outage if a big
> producer had upgraded and done this, IMO likely mistake.
>
> Id just hoped for a more obvious kill-switch, so I didn’t need to bother
> that much.
>
> Best Jan
>
>
>
>
>
> On 29.08.2016 19:36, Jun Rao wrote:
>
>> Jan,
>>
>> For the usefulness of time index, it's ok if you don't plan to use it.
>> However, I do think there are other people who will want to use it. Fixing
>> an application bug always requires some additional work. Intuitively,
>> being
>> able to seek back to a particular point of time for replay is going to be
>> much more efficient than always replaying from the very beginning,
>> especially when the log is retained for a long period of time. Sure, if
>> you
>> want to have more confidence, you want to rewind a bit conservatively. But
>> being able to rewind an extra hour makes a big difference from having to
>> rewind all to way to 7 days or however long the retention time is.
>>
>> For the OffsetRequest, I actually agree with you that it's useful. People
>> can use that to find the first and the last offset and the offset based on
>> a specific point in time. The part that's a bit awkward with OffsetRequest
>> is that it's based on the last modified time of the log segment, which
>> makes it imprecise (precision is at the segment level, not message level)
>> and non-deterministic (last modified time may change). Another awkwardness
>> is that it supports returning a list of offsets after a specified
>> timestamp. We did that simply because timestamp was only at the segment
>> level then. So, our plan is to replace OffsetRequest with a new one. It
>> will give you the same functionality: find the first and the last offset
>> and the offset based on a specific point in time. It will just be better
>> since it's more precise and more deterministic. For your use case, it
>> seems
>> that you don't care about message creation time. Then, it's possible for
>> you to configure the broker with the log append time. Whether this should
>> be default at the Kafka level is debatable, but it won't prevent your use
>> case.
>>
>> For your suggesting on refactoring, I still want to understand how
>> necessary it is. Your main concerns so far seem to be.
>> (a) Impact on rolling log segments.
>> (b) Time-based index is not useful for me.
>>
>> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
>> have
>> given my view on this above. Are there any other things that you think
>> that
>> having a time-based index will hurt?
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
>> wrote:
>>
>> Hi Jun,
>>>
>>> thanks for taking the time to answer on such a detailed level. You are
>>> right Log.fetchOffsetByTimestamp works, the comment is just confusing
>>> "// Get all the segments whose largest timestamp is smaller than target
>>> timestamp" wich is apparently is not what takeWhile does (I am more on
>>> the Java end of things, so I relied on the comment).
>>>
>>> Regarding the frequent file rolling i didn't think of Logcompaction but
>>> that indeed is a place where  can hit the fan pretty easy. especially
>>> if you don't have many updates in there and you pass the timestamp along
>>> in
>>> a kafka-streams application. Bootstrapping a new application then indeed
>>> could produce quite a few old messages kicking this logrolling of until a
>>> recent message appears. I guess that makes it a practical issue again
>>> even
>>> with the 7 days. Thanks for pointing out! Id like to see the appendTime
>>> as
>>> default, I am very happy that I have it in the backpocket for purpose of
>>> tighter sleep and not to worry to much about someone accidentally doing
>>> something dodgy on a weekend with our clusters
>>>
>>> Regarding the usefulness, you will not be able to sell it for me. I don't
>>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want
>>> to
>>> see them.
>>> Look at the error recovery with timestamp seek:
>>> For fixing a bug, a user needs to stop the

Re: KIP-33 Opt out from Time Based indexing

2016-09-05 Thread Jan Filipiak

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just 
complexity of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all 
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I 
don't have to bother. Log Append time will work for me.


Rolling logs was my main concern. The producer can specify the timestamp 
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce 
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big 
producer had upgraded and done this, IMO likely mistake.


Id just hoped for a more obvious kill-switch, so I didn’t need to bother 
that much.


Best Jan




On 29.08.2016 19:36, Jun Rao wrote:

Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively, being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
given my view on this above. Are there any other things that you think that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:


Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again even
with the 7 days. Thanks for pointing out! Id like to see the appendTime as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then on, he still has NO clue
whatsoever if messages that come later now with an earlier timestamp need
to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to
determine this in aggregated downstream windowed stores). So the user is in
 even though he can seek, he
can't rule out his error. IMO it helps them to build the wrong thing, that
will just be operational pain *somewhere*

Look 

Re: KIP-33 Opt out from Time Based indexing

2016-08-29 Thread Becket Qin
Hi Jun,

I just created KAFKA-4099 and will submit patch soon.

Thanks,

Jiangjie (Becket) Qin

On Mon, Aug 29, 2016 at 11:55 AM, Jun Rao  wrote:

> Jiangjie,
>
> Good point on the time index format related to uncompressed messages. It
> does seem that indexing based on file position requires a bit more
> complexity. Since the time index is going to be used infrequently, having a
> level of indirection doesn't seem a big concern. So, we can leave the logic
> as it is.
>
> Do you plan to submit a patch to fix the time-based rolling issue?
>
> Thanks,
>
> Jun
>
> On Fri, Aug 26, 2016 at 3:23 PM, Becket Qin  wrote:
>
> > Jun,
> >
> > Good point about new log rolling behavior issue when move replicas.
> Keeping
> > the old behavior sounds reasonable to me.
> >
> > Currently the time index entry points to the exact shallow message with
> the
> > indexed timestamp, are you suggesting we change it to point to the
> starting
> > offset of the appended batch(message set)? That doesn't seem to work for
> > truncation. For example, imagine an uncompressed message set [(m1,
> > offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105,
> > position=1100)], if we build the time index based on the starting offset
> of
> > this message set, the index entry would be (105, 1000), later on when log
> > is truncated, and m2 is truncated but m1 is not (this is possible for a
> > uncompressed message set) in this case, we will not delete the time index
> > entry because it is technically pointing to m1. Pointing to the end of
> the
> > batch will not work either because then search by timestamp would miss
> m2.
> >
> > I am not sure if it is worth doing, but if we are willing to change the
> > semantic to let the time index entry not point to the exact shallow
> > message. I am thinking maybe we should just switch the semantic to the
> very
> > original one, i.e. time index only means "Max timestamp up util this
> > offset", which is also aligned with offset index entry.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao  wrote:
> >
> > > Jiangjie,
> > >
> > > I am not sure about changing the default to LogAppendTime since
> > CreateTime
> > > is probably what most people want. It also doesn't solve the problem
> > > completely. For example, if you do partition reassignment and need to
> > copy
> > > a bunch of old log segments to a new broker, this may cause log rolling
> > on
> > > every message.
> > >
> > > Another alternative is to just keep the old time rolling behavior,
> which
> > is
> > > rolling based on the create time of the log segment. I had two use
> cases
> > of
> > > time-based rolling in mind. The first one is for users who don't want
> to
> > > retain a message (say sensitive data) in the log for too long. For
> this,
> > > one can set a time-based retention. If the log can roll periodically
> > based
> > > on create time, it will freeze the largest timestamp in the rolled
> > segment
> > > and cause it to be deleted when the time limit has been reached.
> Rolling
> > > based on the timestamp of the first message doesn't help much here
> since
> > > the retention is always based on the largest timestamp. The second one
> is
> > > for log cleaner to happen quicker. Rolling logs periodically based on
> > > create time will also work. So, it seems that if we preserve the old
> time
> > > rolling behavior, we won't lose much functionality, but will avoid the
> > > corner case where the logs could be rolled on every message. What do
> you
> > > think?
> > >
> > > About storing file position in the time index, I don't think it needs
> to
> > > incur overhead during append. At the beginning of append, we are
> already
> > > getting the end position of the log (for maintaining the offset index).
> > We
> > > can just keep track of that together with the last seen offset. Storing
> > the
> > > position has the slight benefit that it avoids another indirection and
> > > seems more consistent with the offset index. It's worth thinking
> through
> > > whether this is better. If we want to change it, it's better to change
> it
> > > now than later.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi Jan,
> > > >
> > > > It seems your main concern is for the changed behavior of time based
> > log
> > > > rolling and time based retention. That is actually why we have two
> > > > timestamp types. If user set the log.message.timestamp.type to
> > > > LogAppendTime, the broker will behave exactly the same as they were,
> > > except
> > > > the rolling and retention would be more accurate and independent to
> the
> > > > replica movements.
> > > >
> > > > The log.message.timestam.max.difference.ms is only useful when users
> > are
> > > > using CreateTime. It is kind of a protection on the broker because an
> > > > insanely large timestamp could ruin the time index due to the way we
> > > handle
> > >

Re: KIP-33 Opt out from Time Based indexing

2016-08-29 Thread Jun Rao
Jiangjie,

Good point on the time index format related to uncompressed messages. It
does seem that indexing based on file position requires a bit more
complexity. Since the time index is going to be used infrequently, having a
level of indirection doesn't seem a big concern. So, we can leave the logic
as it is.

Do you plan to submit a patch to fix the time-based rolling issue?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:23 PM, Becket Qin  wrote:

> Jun,
>
> Good point about new log rolling behavior issue when move replicas. Keeping
> the old behavior sounds reasonable to me.
>
> Currently the time index entry points to the exact shallow message with the
> indexed timestamp, are you suggesting we change it to point to the starting
> offset of the appended batch(message set)? That doesn't seem to work for
> truncation. For example, imagine an uncompressed message set [(m1,
> offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105,
> position=1100)], if we build the time index based on the starting offset of
> this message set, the index entry would be (105, 1000), later on when log
> is truncated, and m2 is truncated but m1 is not (this is possible for a
> uncompressed message set) in this case, we will not delete the time index
> entry because it is technically pointing to m1. Pointing to the end of the
> batch will not work either because then search by timestamp would miss m2.
>
> I am not sure if it is worth doing, but if we are willing to change the
> semantic to let the time index entry not point to the exact shallow
> message. I am thinking maybe we should just switch the semantic to the very
> original one, i.e. time index only means "Max timestamp up util this
> offset", which is also aligned with offset index entry.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao  wrote:
>
> > Jiangjie,
> >
> > I am not sure about changing the default to LogAppendTime since
> CreateTime
> > is probably what most people want. It also doesn't solve the problem
> > completely. For example, if you do partition reassignment and need to
> copy
> > a bunch of old log segments to a new broker, this may cause log rolling
> on
> > every message.
> >
> > Another alternative is to just keep the old time rolling behavior, which
> is
> > rolling based on the create time of the log segment. I had two use cases
> of
> > time-based rolling in mind. The first one is for users who don't want to
> > retain a message (say sensitive data) in the log for too long. For this,
> > one can set a time-based retention. If the log can roll periodically
> based
> > on create time, it will freeze the largest timestamp in the rolled
> segment
> > and cause it to be deleted when the time limit has been reached. Rolling
> > based on the timestamp of the first message doesn't help much here since
> > the retention is always based on the largest timestamp. The second one is
> > for log cleaner to happen quicker. Rolling logs periodically based on
> > create time will also work. So, it seems that if we preserve the old time
> > rolling behavior, we won't lose much functionality, but will avoid the
> > corner case where the logs could be rolled on every message. What do you
> > think?
> >
> > About storing file position in the time index, I don't think it needs to
> > incur overhead during append. At the beginning of append, we are already
> > getting the end position of the log (for maintaining the offset index).
> We
> > can just keep track of that together with the last seen offset. Storing
> the
> > position has the slight benefit that it avoids another indirection and
> > seems more consistent with the offset index. It's worth thinking through
> > whether this is better. If we want to change it, it's better to change it
> > now than later.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin 
> wrote:
> >
> > > Hi Jan,
> > >
> > > It seems your main concern is for the changed behavior of time based
> log
> > > rolling and time based retention. That is actually why we have two
> > > timestamp types. If user set the log.message.timestamp.type to
> > > LogAppendTime, the broker will behave exactly the same as they were,
> > except
> > > the rolling and retention would be more accurate and independent to the
> > > replica movements.
> > >
> > > The log.message.timestam.max.difference.ms is only useful when users
> are
> > > using CreateTime. It is kind of a protection on the broker because an
> > > insanely large timestamp could ruin the time index due to the way we
> > handle
> > > out-of-order timestamps when using CreateTime. But the users who are
> > using
> > > LogAppendTime do not need to worry about this at all.
> > >
> > > The first odd thing is a valid concern. In your case, because you have
> > the
> > > timestamp in the message value, it is probably fine to just use
> > > LogAppendTime on the broker, so the timestamp will only be used to
> > provide
> > > accurate 

Re: KIP-33 Opt out from Time Based indexing

2016-08-29 Thread Jun Rao
Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively, being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
given my view on this above. Are there any other things that you think that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

> Hi Jun,
>
> thanks for taking the time to answer on such a detailed level. You are
> right Log.fetchOffsetByTimestamp works, the comment is just confusing
> "// Get all the segments whose largest timestamp is smaller than target
> timestamp" wich is apparently is not what takeWhile does (I am more on
> the Java end of things, so I relied on the comment).
>
> Regarding the frequent file rolling i didn't think of Logcompaction but
> that indeed is a place where  can hit the fan pretty easy. especially
> if you don't have many updates in there and you pass the timestamp along in
> a kafka-streams application. Bootstrapping a new application then indeed
> could produce quite a few old messages kicking this logrolling of until a
> recent message appears. I guess that makes it a practical issue again even
> with the 7 days. Thanks for pointing out! Id like to see the appendTime as
> default, I am very happy that I have it in the backpocket for purpose of
> tighter sleep and not to worry to much about someone accidentally doing
> something dodgy on a weekend with our clusters
>
> Regarding the usefulness, you will not be able to sell it for me. I don't
> know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
> see them.
> Look at the error recovery with timestamp seek:
> For fixing a bug, a user needs to stop the SP, truncate all his downstream
> data perfectly based on their time window.Then restart and do the first
> fetch based
> again on the perfect window timeout. From then on, he still has NO clue
> whatsoever if messages that come later now with an earlier timestamp need
> to go into the
> previous window or not. (Note that there is  >>>absolutly no<<< way to
> determine this in aggregated downstream windowed stores). So the user is in
>  even though he can seek, he
> can't rule out his error. IMO it helps them to build the wrong thing, that
> will just be operational pain *somewhere*
>
> Look at the error recovery without timestamp seek:
> start your application from beginning with a different output
> (version,key,partition) wait for it to fully catch up. drop the timewindows
> the error happend + confidence interval (if your data isnt there anymore,
> no seek will help) in from the old version. Stop the stream processor,
> merge the data it created, switch back to the original
> (version,key,partition) and start the SP again.
> Done. As bigger you choose the confidence interval, the more correct, the
> less the index helps. usually you want maximum confidence => no index
> usage, get everything that is still there. (Maybe even redump from hadoop
> in extreme cases) ironically causing the log to roll all the time (as you
> probably publish to a new topic and have the streams application use b

Re: KIP-33 Opt out from Time Based indexing

2016-08-28 Thread Becket Qin
Jan,

Thanks for the example of reprocessing the messages. I think in any case,
reconsuming all the messages will definitely work. What we want to do here
is to see if we can avoid doing that by only reconsuming necessary
messages.

In the scenario you mentioned, can you store an "offset-of-last-update" for
each window? It is essentially the offset of the last message that goes
into the window. During the reprocess, if the earlier messages has an
offset less than or equals to the "offset-of-last-update" for the
corresponding window, the processor knows that this message has been added
to the aggregated window. This may need some work, but seems cheaper than
reconsuming everything.

Admittedly, if the messages are in create time and the timestamp is spread
over a wide range, users may not be able to gain much from search by
timestamp.

Regarding OffstRequest. I think what Jun meant was that we are going to
deprecate the current OffsetRequest v0 which returns a list of segment base
offsets. I am working on a new KIP for OffsetReqeust v1 which returns the
accurate message offset based on timestamp. I can hardly imagine we will
let users deal with a physical position directly :)

Thanks,

Jiangjie (Becket) Qin


On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

> Hi Jun,
>
> thanks for taking the time to answer on such a detailed level. You are
> right Log.fetchOffsetByTimestamp works, the comment is just confusing
> "// Get all the segments whose largest timestamp is smaller than target
> timestamp" wich is apparently is not what takeWhile does (I am more on the
> Java end of things, so I relied on the comment).
>
> Regarding the frequent file rolling i didn't think of Logcompaction but
> that indeed is a place where  can hit the fan pretty easy. especially
> if you don't have many updates in there and you pass the timestamp along in
> a kafka-streams application. Bootstrapping a new application then indeed
> could produce quite a few old messages kicking this logrolling of until a
> recent message appears. I guess that makes it a practical issue again even
> with the 7 days. Thanks for pointing out! Id like to see the appendTime as
> default, I am very happy that I have it in the backpocket for purpose of
> tighter sleep and not to worry to much about someone accidentally doing
> something dodgy on a weekend with our clusters
>
> Regarding the usefulness, you will not be able to sell it for me. I don't
> know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
> see them.
> Look at the error recovery with timestamp seek:
> For fixing a bug, a user needs to stop the SP, truncate all his downstream
> data perfectly based on their time window.Then restart and do the first
> fetch based
> again on the perfect window timeout. From then on, he still has NO clue
> whatsoever if messages that come later now with an earlier timestamp need
> to go into the
> previous window or not. (Note that there is  >>>absolutly no<<< way to
> determine this in aggregated downstream windowed stores). So the user is in
>  even though he can seek, he
> can't rule out his error. IMO it helps them to build the wrong thing, that
> will just be operational pain *somewhere*
>
> Look at the error recovery without timestamp seek:
> start your application from beginning with a different output
> (version,key,partition) wait for it to fully catch up. drop the timewindows
> the error happend + confidence interval (if your data isnt there anymore,
> no seek will help) in from the old version. Stop the stream processor,
> merge the data it created, switch back to the original
> (version,key,partition) and start the SP again.
> Done. As bigger you choose the confidence interval, the more correct, the
> less the index helps. usually you want maximum confidence => no index
> usage, get everything that is still there. (Maybe even redump from hadoop
> in extreme cases) ironically causing the log to roll all the time (as you
> probably publish to a new topic and have the streams application use both)
> :(
>
> As you can see, even though the users can seek, if they want to create
> proper numbers, Billing information eg. They are in trouble, and giving
> them this index will just make them implement the wrong solution! It boils
> down to: this index is not the kafka way of doing things. The index can
> help the second approach but usually one chooses the confidence interval =
> as much as one can get.
>
> Then the last thing. "OffsetRequest is a legacy request. It's awkward to
> use and we plan to deprecate it over time". You got to be kidding me. It
> was wired to get the byteposition back then, but getting the offsets is
> perfectly reasonable and one of the best things in the world. want to know
> how your stream looked at a specific point in time? get start and end
> offset, fetch whenever you like, you get an perfect snapshot in wall time.
> this is usefull for compacted topis aswell as streaming topics. Offsets are

Re: KIP-33 Opt out from Time Based indexing

2016-08-26 Thread Jan Filipiak

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are 
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target 
timestamp" wich is apparently is not what takeWhile does (I am more on 
the Java end of things, so I relied on the comment).


Regarding the frequent file rolling i didn't think of Logcompaction but 
that indeed is a place where  can hit the fan pretty easy. 
especially if you don't have many updates in there and you pass the 
timestamp along in a kafka-streams application. Bootstrapping a new 
application then indeed could produce quite a few old messages kicking 
this logrolling of until a recent message appears. I guess that makes it 
a practical issue again even with the 7 days. Thanks for pointing out! 
Id like to see the appendTime as default, I am very happy that I have it 
in the backpocket for purpose of tighter sleep and not to worry to much 
about someone accidentally doing something dodgy on a weekend with our 
clusters


Regarding the usefulness, you will not be able to sell it for me. I 
don't know how people build applications with this ¯\_(ツ)_/¯ but I 
don't want to see them.

Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his 
downstream data perfectly based on their time window.Then restart and do 
the first fetch based
again on the perfect window timeout. From then on, he still has NO clue 
whatsoever if messages that come later now with an earlier timestamp 
need to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to 
determine this in aggregated downstream windowed stores). So the user is 
in  even though he can seek, he
can't rule out his error. IMO it helps them to build the wrong thing, 
that will just be operational pain *somewhere*


Look at the error recovery without timestamp seek:
start your application from beginning with a different output 
(version,key,partition) wait for it to fully catch up. drop the 
timewindows the error happend + confidence interval (if your data isnt 
there anymore, no seek will help) in from the old version. Stop the 
stream processor, merge the data it created, switch back to the original 
(version,key,partition) and start the SP again.
Done. As bigger you choose the confidence interval, the more correct, 
the less the index helps. usually you want maximum confidence => no 
index usage, get everything that is still there. (Maybe even redump from 
hadoop in extreme cases) ironically causing the log to roll all the time 
(as you probably publish to a new topic and have the streams application 
use both) :(


As you can see, even though the users can seek, if they want to create 
proper numbers, Billing information eg. They are in trouble, and giving 
them this index will just make them implement the wrong solution! It 
boils down to: this index is not the kafka way of doing things. The 
index can help the second approach but usually one chooses the 
confidence interval = as much as one can get.


Then the last thing. "OffsetRequest is a legacy request. It's awkward to 
use and we plan to deprecate it over time". You got to be kidding me. It 
was wired to get the byteposition back then, but getting the offsets is 
perfectly reasonable and one of the best things in the world. want to 
know how your stream looked at a specific point in time? get start and 
end offset, fetch whenever you like, you get an perfect snapshot in wall 
time. this is usefull for compacted topis aswell as streaming topics. 
Offsets are a well known thing in kafka and in no way awkward as its 
monotonically increasing property is just great.


For seeking the log based on a confidence interval (the only chance you 
get in non-key logs reprocessing) one can also bisect the log from the 
client. As the case is rare it is intensive and causes at least a few 
hundreds seeks for bigger topics. but I guess the broker does these 
extra for the new index file now.


This index, I feel is just not following the whole "kafka-way". Can you 
suggest on the proposed re-factoring? what are the chance to get it 
upstream if I could pull it off? (unlikely)


Thanks for all the effort you put in into listening to my concerns. 
highly appreciated!


Best Jan



On 25.08.2016 23:36, Jun Rao wrote:

Jan,

Thanks a lot for the feedback. Now I understood your concern better. 
The following are my comments.


The first odd thing that you pointed out could be a real concern. 
Basically, if a producer publishes messages with really old timestamp, 
our default log.roll.hours (7 days) will indeed cause the broker to 
roll a log on ever message, which would be bad. Time-based rolling is 
actually used infrequently. The only use case that I am aware of is 
that for compacted topics, rolling logs based on time could allow the 
compaction to happen sooner (since the active segment is never 
cl

Re: KIP-33 Opt out from Time Based indexing

2016-08-26 Thread Becket Qin
Jun,

Good point about new log rolling behavior issue when move replicas. Keeping
the old behavior sounds reasonable to me.

Currently the time index entry points to the exact shallow message with the
indexed timestamp, are you suggesting we change it to point to the starting
offset of the appended batch(message set)? That doesn't seem to work for
truncation. For example, imagine an uncompressed message set [(m1,
offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105,
position=1100)], if we build the time index based on the starting offset of
this message set, the index entry would be (105, 1000), later on when log
is truncated, and m2 is truncated but m1 is not (this is possible for a
uncompressed message set) in this case, we will not delete the time index
entry because it is technically pointing to m1. Pointing to the end of the
batch will not work either because then search by timestamp would miss m2.

I am not sure if it is worth doing, but if we are willing to change the
semantic to let the time index entry not point to the exact shallow
message. I am thinking maybe we should just switch the semantic to the very
original one, i.e. time index only means "Max timestamp up util this
offset", which is also aligned with offset index entry.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao  wrote:

> Jiangjie,
>
> I am not sure about changing the default to LogAppendTime since CreateTime
> is probably what most people want. It also doesn't solve the problem
> completely. For example, if you do partition reassignment and need to copy
> a bunch of old log segments to a new broker, this may cause log rolling on
> every message.
>
> Another alternative is to just keep the old time rolling behavior, which is
> rolling based on the create time of the log segment. I had two use cases of
> time-based rolling in mind. The first one is for users who don't want to
> retain a message (say sensitive data) in the log for too long. For this,
> one can set a time-based retention. If the log can roll periodically based
> on create time, it will freeze the largest timestamp in the rolled segment
> and cause it to be deleted when the time limit has been reached. Rolling
> based on the timestamp of the first message doesn't help much here since
> the retention is always based on the largest timestamp. The second one is
> for log cleaner to happen quicker. Rolling logs periodically based on
> create time will also work. So, it seems that if we preserve the old time
> rolling behavior, we won't lose much functionality, but will avoid the
> corner case where the logs could be rolled on every message. What do you
> think?
>
> About storing file position in the time index, I don't think it needs to
> incur overhead during append. At the beginning of append, we are already
> getting the end position of the log (for maintaining the offset index). We
> can just keep track of that together with the last seen offset. Storing the
> position has the slight benefit that it avoids another indirection and
> seems more consistent with the offset index. It's worth thinking through
> whether this is better. If we want to change it, it's better to change it
> now than later.
>
> Thanks,
>
> Jun
>
> On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin  wrote:
>
> > Hi Jan,
> >
> > It seems your main concern is for the changed behavior of time based log
> > rolling and time based retention. That is actually why we have two
> > timestamp types. If user set the log.message.timestamp.type to
> > LogAppendTime, the broker will behave exactly the same as they were,
> except
> > the rolling and retention would be more accurate and independent to the
> > replica movements.
> >
> > The log.message.timestam.max.difference.ms is only useful when users are
> > using CreateTime. It is kind of a protection on the broker because an
> > insanely large timestamp could ruin the time index due to the way we
> handle
> > out-of-order timestamps when using CreateTime. But the users who are
> using
> > LogAppendTime do not need to worry about this at all.
> >
> > The first odd thing is a valid concern. In your case, because you have
> the
> > timestamp in the message value, it is probably fine to just use
> > LogAppendTime on the broker, so the timestamp will only be used to
> provide
> > accurate log retention and log rolling based on when the message was
> > produced to the broker regardless when the message was created. This
> should
> > provide the exact same behavior on the broker side as before. (Apologies
> > for the stale WIKI statement on the lines you quoted, as Jun said, the
> log
> > segment rolling is based on the timestamp of the first message instead of
> > the largest timestamp in the log segment. I sent a change notification to
> > the mailing list but forgot to update the wiki page. I just updated the
> > wiki page.)
> >
> > The second odd thing, as Jun mentioned, by design we do not keep a global
> > timestamp orde

Re: KIP-33 Opt out from Time Based indexing

2016-08-26 Thread Jun Rao
Jiangjie,

I am not sure about changing the default to LogAppendTime since CreateTime
is probably what most people want. It also doesn't solve the problem
completely. For example, if you do partition reassignment and need to copy
a bunch of old log segments to a new broker, this may cause log rolling on
every message.

Another alternative is to just keep the old time rolling behavior, which is
rolling based on the create time of the log segment. I had two use cases of
time-based rolling in mind. The first one is for users who don't want to
retain a message (say sensitive data) in the log for too long. For this,
one can set a time-based retention. If the log can roll periodically based
on create time, it will freeze the largest timestamp in the rolled segment
and cause it to be deleted when the time limit has been reached. Rolling
based on the timestamp of the first message doesn't help much here since
the retention is always based on the largest timestamp. The second one is
for log cleaner to happen quicker. Rolling logs periodically based on
create time will also work. So, it seems that if we preserve the old time
rolling behavior, we won't lose much functionality, but will avoid the
corner case where the logs could be rolled on every message. What do you
think?

About storing file position in the time index, I don't think it needs to
incur overhead during append. At the beginning of append, we are already
getting the end position of the log (for maintaining the offset index). We
can just keep track of that together with the last seen offset. Storing the
position has the slight benefit that it avoids another indirection and
seems more consistent with the offset index. It's worth thinking through
whether this is better. If we want to change it, it's better to change it
now than later.

Thanks,

Jun

On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin  wrote:

> Hi Jan,
>
> It seems your main concern is for the changed behavior of time based log
> rolling and time based retention. That is actually why we have two
> timestamp types. If user set the log.message.timestamp.type to
> LogAppendTime, the broker will behave exactly the same as they were, except
> the rolling and retention would be more accurate and independent to the
> replica movements.
>
> The log.message.timestam.max.difference.ms is only useful when users are
> using CreateTime. It is kind of a protection on the broker because an
> insanely large timestamp could ruin the time index due to the way we handle
> out-of-order timestamps when using CreateTime. But the users who are using
> LogAppendTime do not need to worry about this at all.
>
> The first odd thing is a valid concern. In your case, because you have the
> timestamp in the message value, it is probably fine to just use
> LogAppendTime on the broker, so the timestamp will only be used to provide
> accurate log retention and log rolling based on when the message was
> produced to the broker regardless when the message was created. This should
> provide the exact same behavior on the broker side as before. (Apologies
> for the stale WIKI statement on the lines you quoted, as Jun said, the log
> segment rolling is based on the timestamp of the first message instead of
> the largest timestamp in the log segment. I sent a change notification to
> the mailing list but forgot to update the wiki page. I just updated the
> wiki page.)
>
> The second odd thing, as Jun mentioned, by design we do not keep a global
> timestamp order. During the search, we start from the oldest segment and
> scan over the segment until we find the first segment that contains a
> timestamp which is larger than the target timestamp. This should guarantee
> no message with larger timestamp will be missed. For example if we have 3
> segments whose largest timestamps are 100 300 200, and we are looking for
> timestamp 250, we will start to scan at first segment and stop at the
> second segment and search inside that segment for the first timestmap
> greater or equals to 250. So reordered largest timestamp across segments
> should not be an issue.
>
> The third odd thing is a good point. There are a few reasons we chose to
> store the offsets instead physical position in the time index. Easier
> truncation is one of the reasons but this may not be a big issue. Another
> reason is that in the early implementation, the time index and offset index
> are actually aligned, i.e. each offset in the time index as a corresponding
> entry in the offset index ( the reverse is not true). So the physical
> position is already stored in the offset index. Later on we switched to the
> current implementation, which has the time index pointing to the exact
> shallow message in the log segment. With this implementation, if the
> message with the largest timestamp appears in the middle of an uncompressed
> message set, we may need to calculate the physical position for that
> message. This is doable but could potentially be an overhead for each
> a

Re: KIP-33 Opt out from Time Based indexing

2016-08-25 Thread Becket Qin
Hi Jan,

It seems your main concern is for the changed behavior of time based log
rolling and time based retention. That is actually why we have two
timestamp types. If user set the log.message.timestamp.type to
LogAppendTime, the broker will behave exactly the same as they were, except
the rolling and retention would be more accurate and independent to the
replica movements.

The log.message.timestam.max.difference.ms is only useful when users are
using CreateTime. It is kind of a protection on the broker because an
insanely large timestamp could ruin the time index due to the way we handle
out-of-order timestamps when using CreateTime. But the users who are using
LogAppendTime do not need to worry about this at all.

The first odd thing is a valid concern. In your case, because you have the
timestamp in the message value, it is probably fine to just use
LogAppendTime on the broker, so the timestamp will only be used to provide
accurate log retention and log rolling based on when the message was
produced to the broker regardless when the message was created. This should
provide the exact same behavior on the broker side as before. (Apologies
for the stale WIKI statement on the lines you quoted, as Jun said, the log
segment rolling is based on the timestamp of the first message instead of
the largest timestamp in the log segment. I sent a change notification to
the mailing list but forgot to update the wiki page. I just updated the
wiki page.)

The second odd thing, as Jun mentioned, by design we do not keep a global
timestamp order. During the search, we start from the oldest segment and
scan over the segment until we find the first segment that contains a
timestamp which is larger than the target timestamp. This should guarantee
no message with larger timestamp will be missed. For example if we have 3
segments whose largest timestamps are 100 300 200, and we are looking for
timestamp 250, we will start to scan at first segment and stop at the
second segment and search inside that segment for the first timestmap
greater or equals to 250. So reordered largest timestamp across segments
should not be an issue.

The third odd thing is a good point. There are a few reasons we chose to
store the offsets instead physical position in the time index. Easier
truncation is one of the reasons but this may not be a big issue. Another
reason is that in the early implementation, the time index and offset index
are actually aligned, i.e. each offset in the time index as a corresponding
entry in the offset index ( the reverse is not true). So the physical
position is already stored in the offset index. Later on we switched to the
current implementation, which has the time index pointing to the exact
shallow message in the log segment. With this implementation, if the
message with the largest timestamp appears in the middle of an uncompressed
message set, we may need to calculate the physical position for that
message. This is doable but could potentially be an overhead for each
append and adding some complexity. Given that OffsetRequest is supposed to
be a pretty infrequent request, it is probably OK to do the secondary
lookup but save the work on each append.

Jun has already mentioned a few use cases for searching by timestamp. At
LinkedIn we also have several such use cases where people want to rewind
the offsets to a certain time and reprocess the streams.

@Jun, currently we are using CreateTime as the default value for
log.message.timestamp.type. I am wondering would it be less surprising if
we change the default value to LogAppendTime so that the previous behavior
is maintained, because for users it would be bad if upgrading cause their
message got deleted due the change of the behavior. What do you think?

Thanks,

Jiangjie (Becket) Qin





On Thu, Aug 25, 2016 at 2:36 PM, Jun Rao  wrote:

> Jan,
>
> Thanks a lot for the feedback. Now I understood your concern better. The
> following are my comments.
>
> The first odd thing that you pointed out could be a real concern.
> Basically, if a producer publishes messages with really old timestamp, our
> default log.roll.hours (7 days) will indeed cause the broker to roll a log
> on ever message, which would be bad. Time-based rolling is actually used
> infrequently. The only use case that I am aware of is that for compacted
> topics, rolling logs based on time could allow the compaction to happen
> sooner (since the active segment is never cleaned). One option is to change
> the default log.roll.hours to infinite and also document the impact on
> changing log.roll.hours. Jiangjie, what do you think?
>
> For the second odd thing, the OffsetRequest is a legacy request. It's
> awkward to use and we plan to deprecate it over time. That's why we haven't
> change the logic in serving OffsetRequest after KIP-33. The plan is to
> introduce a new OffsetRequest that will be exploiting the time based index.
> It's possible to have log segments with non-increasing largest timestamp

Re: KIP-33 Opt out from Time Based indexing

2016-08-25 Thread Jun Rao
Jan,

Thanks a lot for the feedback. Now I understood your concern better. The
following are my comments.

The first odd thing that you pointed out could be a real concern.
Basically, if a producer publishes messages with really old timestamp, our
default log.roll.hours (7 days) will indeed cause the broker to roll a log
on ever message, which would be bad. Time-based rolling is actually used
infrequently. The only use case that I am aware of is that for compacted
topics, rolling logs based on time could allow the compaction to happen
sooner (since the active segment is never cleaned). One option is to change
the default log.roll.hours to infinite and also document the impact on
changing log.roll.hours. Jiangjie, what do you think?

For the second odd thing, the OffsetRequest is a legacy request. It's
awkward to use and we plan to deprecate it over time. That's why we haven't
change the logic in serving OffsetRequest after KIP-33. The plan is to
introduce a new OffsetRequest that will be exploiting the time based index.
It's possible to have log segments with non-increasing largest timestamp.
As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
segments in offset order and stop when we see the target timestamp.

For the third odd thing, one of the original reasons why the time-based
index points to an offset instead of the file position is that it makes
truncating the time index to an offset easier since the offset is in the
index. Looking at the code, we could also store the file position in the
time index and do truncation based on position, instead of offset. It
probably has a slight advantage of consistency between the two indexes and
avoiding another level of indirection when looking up the time index.
Jiangjie, have we ever considered that?

The idea of log.message.timestamp.difference.max.ms is to prevent the
timestamp in the published messages to drift too far away from the current
timestamp. The default value is infinite though.

Lastly, for the usefulness of time-based index, it's actually a feature
that the community wanted and voted for, not just for Confluent customers.
For example, being able to seek to an offset based on timestamp has been a
frequently asked feature. This can be useful for at least the following
scenarios: (1) If there is a bug in a consumer application, the user will
want to rewind the consumption after fixing the logic. In this case, it's
more convenient to rewind the consumption based on a timestamp. (2) In a
multi data center setup, it's common for people to mirror the data from one
Kafka cluster in one data center to another cluster in a different data
center. If one data center fails, people want to be able to resume the
consumption in the other data center. Since the offsets are not preserving
between the two clusters through mirroring, being able to find a starting
offset based on timestamp will allow the consumer to resume the consumption
without missing any messages and also not replaying too many messages.

Thanks,

Jun


On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak 
wrote:

> Hey Jun,
>
> I go and try again :), wrote the first one in quite a stressful
> environment. The bottom line is that I, for our use cases, see a to small
> use/effort ratio in this time index.
> We do not bootstrap new consumers for key-less logs so frequently and when
> we do it, they usually want everything (prod deployment) or just start at
> the end ( during development).
> That caused quite some frustration. Would be better if I could just have
> turned it off and don't bother any more. Anyhow in the meantime I had to
> dig deeper into the inner workings
> and the impacts are not as dramatic as I initially assumed. But it still
> carries along some oddities I want to list here.
>
> first odd thing:
> Quote
> ---
> Enforce time based log rolling
>
> Currently time based log rolling is based on the creating time of the log
> segment. With this KIP, the time based rolling would be changed to based on
> the largest timestamp ever seen in a log segment. A new log segment will be
> rolled out if current time is greater than largest timestamp ever seen in
> the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
> user should set max.message.time.difference.ms appropriately together
> with log.roll.ms to avoid frequent log segment roll out.
> ---
> imagine a Mirrormaker falls behind and the Mirrormaker has a delay of some
> time > log.roll.ms.
> From my understanding, when noone else is producing to this partition
> except the mirror maker, the broker will start rolling on every append?
> Just because you maybe under DOS-attack and your application only works in
> the remote location. (also a good occasion for MM to fall behind)
> But checking the default values indicates that it should indeed not become
> a problem as log.roll.ms defaults to ~>7 days.
>
>
> second odd thing:
> Quote
> ---
> A time index entry (*T*, *offset*) means that in this segment any mess

Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak

Hey Jun,

I go and try again :), wrote the first one in quite a stressful 
environment. The bottom line is that I, for our use cases, see a to 
small use/effort ratio in this time index.
We do not bootstrap new consumers for key-less logs so frequently and 
when we do it, they usually want everything (prod deployment) or just 
start at the end ( during development).
That caused quite some frustration. Would be better if I could just have 
turned it off and don't bother any more. Anyhow in the meantime I had to 
dig deeper into the inner workings
and the impacts are not as dramatic as I initially assumed. But it still 
carries along some oddities I want to list here.


first odd thing:
Quote
---
Enforce time based log rolling

Currently time based log rolling is based on the creating time of the 
log segment. With this KIP, the time based rolling would be changed to 
based on the largest timestamp ever seen in a log segment. A new log 
segment will be rolled out if current time is greater than largest 
timestamp ever seen in the log segment + log.roll.ms. When 
message.timestamp.type=CreateTime, user should set 
max.message.time.difference.ms appropriately together with log.roll.ms 
to avoid frequent log segment roll out.


---
imagine a Mirrormaker falls behind and the Mirrormaker has a delay of 
some time > log.roll.ms.
From my understanding, when noone else is producing to this partition 
except the mirror maker, the broker will start rolling on every append?
Just because you maybe under DOS-attack and your application only works 
in the remote location. (also a good occasion for MM to fall behind)
But checking the default values indicates that it should indeed not 
become a problem as log.roll.ms defaults to ~>7 days.



second odd thing:
Quote
---
A time index entry (/T/, /offset/) means that in this segment any 
message whose timestamp is greater than /T/ come after /offset./


The OffsetRequest behaves almost the same as before. If timestamp *T* is 
set in the OffsetRequest, the first offset in the returned offset 
sequence means that if user want to consume from *T*, that is the offset 
to start with. The guarantee is that any message whose timestamp is 
greater than T has a bigger offset. i.e. Any message before this offset 
has a timestamp < *T*.

---

Given how the index is maintained, with a little bit of bad luck 
(rolling upgrade/config change of mirrormakers for different 
colocations) one ends with segmentN.timeindex.maxtimestamp > 
segmentN+1.timeindex.maxtimestamp. If I do not overlook something here, 
then the fetch code does not seem to take that into account.
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604 

In this case the Goal listed number 1, not loose any messages, is not 
achieved. easy fix seems to be to sort the segsArray by maxtimestamp but 
can't wrap my head around it just now.



third odd thing:
Regarding the worry of increasing complexity. Looking at the code
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 
-196
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 
& 230
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 
-266
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 
-307
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 
- 410
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 
- 435
https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 
-108

and especially
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
it feels like the Log & Log segment having a detailed knowledge about 
the maintained indexes is not the ideal way to model the problem.
Having the Server maintian a Set of Indexes could reduce the code 
complexity, while also allowing an easy switch to turn it off. I think 
both indexes could point to the physical position, a client would do 
fetch(timestamp), and the continue with the offsets as usual. Is there 
any specific reason the timestamp index points into the offset index?
For reading one would need to branch earlier, maybe already in 
ApiHandler and decide what indexes to query, but this branching logic is 
there now anyhow.


Further I also can't think of a situation where one wants to have this 
log.message.timestamp.difference.max.ms take effect. I think this 
defeats goal 1 again.


ITE having this index in the brokers now feels wired to me. Gives me a 
fe

Re: Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jun Rao
Jan,

Thanks for the reply. I actually wasn't sure what your main concern on
time-based rolling is. Just a couple of clarifications. (1) Time-based
rolling doesn't control how long a segment will be retained for. For
retention, if you use time-based, it will now be based on the timestamp in
the message. If you use size-based, it works the same as before. Is your
concern on time-based retention? If so, you can always configure the
timestamp in all topics to be log append time, which will give you the same
behavior as before. (2) The creation time of the segment is never exposed
to the consumer and therefore is never preserved in MirrorMaker. In
contrast, the timestamp in the message will be preserved in MirrorMaker.
So, not sure what your concern on MirrorMaker is.

Jun

On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak 
wrote:

> Hi Jun,
>
> I copy pasted this mail from the archive, as I somehow didn't receive it
> per mail. I will sill make some comments in line,
> hopefully you can find them quick enough, my apologies.
>
> To make things more clear, you should also know, that all messages in our
> kafka setup have a common way to access their timestamp already (its
> encoded in the value the same way always)
> Sometimes this is a logical time (eg same timestamp accross many different
> topics / partitions), say PHP request start time or the like. So kafkas
> internal timestamps are not really attractive
> for us anyways currently.
>
> I hope I can make a point and not waste your time.
>
> Best Jan,
>
> hopefully everything makes sense
>
> 
>
> Jan,
>
> Currently, there is no switch to disable the time based index.
>
> There are quite a few use cases of time based index.
>
> 1. From KIP-33's wiki, it allows us to do time-based retention accurately.
> Before KIP-33, the time-based retention is based on the last modified time
> of each log segment. The main issue is that last modified time can change
> over time. For example, if a broker loses storage and has to re-replicate
> all data, those re-replicated segments will be retained much longer since
> their last modified time is more recent. Having a time-based index allows
> us to retain segments based on the message time, not the last modified
> time. This can also benefit KIP-71, where we want to combine time-based
> retention and compaction.
>
> /If your sparse on discspace, one could try to get by that with
> retention.bytes/
> or, as we did, ssh into the box and rm it, which worked quite good when no
> one reads it.
> Chuckles a little when its read but readers usually do an auto.offset.reset
> (they are to slow any ways if they reading the last segments hrhr).
>
> 2. In KIP-58, we want to delay log compaction based on a configurable
> amount of time. Time-based index allows us to do this more accurately.
>
> /good point, seems reasonable/
>
> 3. We plan to add an api in the consumer to allow seeking to an offset
> based on a timestamp. The time based index allows us to do this more
> accurately and fast.
>
> /Sure, I personally feel that you rarely want to do this. For Camus, we
> used max.pull.historic.days (or simmilliar) successfully quite often. we
> just gave it an extra day and got what we wanted
> and for debugging my bisect tool works well enough. So these are the 2
> usecases we expierenced already and found a decent way around it./
>
> Now for the impact.
>
> a. There is a slight change on how time-based rolling works. Before KIP-33,
> rolling was based on the time when a segment was loaded in the broker.
> After KIP-33, rolling is based on the time of the first message of a
> segment. Not sure if this is your concern. In the common case, the two
> behave more or less the same. The latter is actually more deterministic
> since it's not sensitive to broker restarts.
>
> /This is part of my main concern indeed. This is what scares me and I
> preffered to just opt out, instead of reviewing all our pipelines to check
> whats gonna happen when we put it live.
> For Example the Mirrormakers, If they want to preserve create time from
> the source cluster and publish the same create time (wich they should do,
> if you don't encode your own timestamps and want
> to have proper kafka-streams windowing). Then I am quite concerned when
> have problems if our cross ocian links and fall behind, say a day or two.
> Then I can think of an very up to date MirrorMaker from
> one colocation and a very laggy Mirrormaker from another colocation. For
> me its not 100% clear whats gonna happen. But I can't think of sane
> defaults there. That i love kafka for.
> Just tricky to be convinced that an upgrade is safe, wich was usually easy.
> /
> b. Time-based index potentially adds overhead to producing messages and
> loading segments. Our experiments show that the impact to producing is
> insignificant. The time to load segments when restarting a broker can be
> doubled. However, the absolute time is still reasonable. For example,
> loading 10K log segments wit

Re: Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak

Hi Jun,

I copy pasted this mail from the archive, as I somehow didn't receive it per 
mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.

To make things more clear, you should also know, that all messages in our kafka 
setup have a common way to access their timestamp already (its encoded in the 
value the same way always)
Sometimes this is a logical time (eg same timestamp accross many different 
topics / partitions), say PHP request start time or the like. So kafkas 
internal timestamps are not really attractive
for us anyways currently.

I hope I can make a point and not waste your time.

Best Jan,

hopefully everything makes sense



Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention accurately.
Before KIP-33, the time-based retention is based on the last modified time
of each log segment. The main issue is that last modified time can change
over time. For example, if a broker loses storage and has to re-replicate
all data, those re-replicated segments will be retained much longer since
their last modified time is more recent. Having a time-based index allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine time-based
retention and compaction.

/If your sparse on discspace, one could try to get by that with retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good when no one 
reads it.
Chuckles a little when its read but readers usually do an auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

/good point, seems reasonable/

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

/Sure, I personally feel that you rarely want to do this. For Camus, we used 
max.pull.historic.days (or simmilliar) successfully quite often. we just gave 
it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are the 2 usecases 
we expierenced already and found a decent way around it./

Now for the impact.

a. There is a slight change on how time-based rolling works. Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more deterministic
since it's not sensitive to broker restarts.

/This is part of my main concern indeed. This is what scares me and I preffered 
to just opt out, instead of reviewing all our pipelines to check whats gonna 
happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time from the 
source cluster and publish the same create time (wich they should do, if you 
don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite concerned when have 
problems if our cross ocian links and fall behind, say a day or two. Then I can 
think of an very up to date MirrorMaker from
one colocation and a very laggy Mirrormaker from another colocation. For me its 
not 100% clear whats gonna happen. But I can't think of sane defaults there. 
That i love kafka for.
Just tricky to be convinced that an upgrade is safe, wich was usually easy.
/
b. Time-based index potentially adds overhead to producing messages and
loading segments. Our experiments show that the impact to producing is
insignificant. The time to load segments when restarting a broker can be
doubled. However, the absolute time is still reasonable. For example,
loading 10K log segments with time-based index takes about 5 seconds.
/
//Loading should be fine/, totally agree

c Because time-based index is useful in several cases and the impact seems
small, we didn't consider making time based index optional. Finally,
although it's possible to make the time based index optional, it will add
more complexity to the code base. So, we probably should only consider it
if it's truly needed. Thanks,

/I think one can get away with an easier codebase here. The trick is not to 
have the LOG to implement all the logic,
but just have the broker maintain a Set of Indexes, that gets initialized in 
starup and passed to the LOG. One could ask each individual
index, if that logsegment should be rolled, compacted, truncated whatever.  
Once could also give that LogSegment to each index and make it rebuild
the index for example. I didn't figure out the details. But this
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/

Re: KIP-33 Opt out from Time Based indexing

2016-08-22 Thread Jun Rao
Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention accurately.
Before KIP-33, the time-based retention is based on the last modified time
of each log segment. The main issue is that last modified time can change
over time. For example, if a broker loses storage and has to re-replicate
all data, those re-replicated segments will be retained much longer since
their last modified time is more recent. Having a time-based index allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine time-based
retention and compaction.

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

Now for the impact.

a. There is a slight change on how time-based rolling works. Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more deterministic
since it's not sensitive to broker restarts.

b. Time-based index potentially adds overhead to producing messages and
loading segments. Our experiments show that the impact to producing is
insignificant. The time to load segments when restarting a broker can be
doubled. However, the absolute time is still reasonable. For example,
loading 10K log segments with time-based index takes about 5 seconds.

Because time-based index is useful in several cases and the impact seems
small, we didn't consider making time based index optional. Finally,
although it's possible to make the time based index optional, it will add
more complexity to the code base. So, we probably should only consider it
if it's truly needed.

Thanks,

Jun

On Mon, Aug 22, 2016 at 12:24 AM, Jan Filipiak 
wrote:

> Hello everyone,
>
> I stumbled across KIP-33 and the time based index, while briefly checking
> the wiki and commits, I fail to find a way to opt out.
> I saw it having quite some impact on when logs are rolled and was hoping
> not to have to deal with all of that. Is there a disable switch I
> overlooked?
>
> Does anybody have a good use case where the timebase index comes in handy?
> I made a custom console consumer for me,
> that can bisect a log based on time. Its just a quick probabilistic shot
> into the log but is sometimes quite useful for some debugging.
>
> Best Jan
>


Re: KIP-33 Opt out from Time Based indexing

2016-08-22 Thread Jay Kreps
Can you describe the behavior you saw that you didn't like?

-Jay

On Mon, Aug 22, 2016 at 12:24 AM, Jan Filipiak 
wrote:

> Hello everyone,
>
> I stumbled across KIP-33 and the time based index, while briefly checking
> the wiki and commits, I fail to find a way to opt out.
> I saw it having quite some impact on when logs are rolled and was hoping
> not to have to deal with all of that. Is there a disable switch I
> overlooked?
>
> Does anybody have a good use case where the timebase index comes in handy?
> I made a custom console consumer for me,
> that can bisect a log based on time. Its just a quick probabilistic shot
> into the log but is sometimes quite useful for some debugging.
>
> Best Jan
>