Hi Jun,

What do you think about the above solution? I am trying to include KIP-33
into 0.10.0 because the log retention has been a long pending issue.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 1, 2016 at 8:18 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jun,
>
> I see. If we only use index.interval.bytes, the time index entry will be
> inserted when (1) the largest timestamp is in this segment AND (2) at least
> index.interval.bytes have been appended since last time index entry
> insertion.
> In this case (1) becomes implicit instead of having an explicit threshold
> of time.index.interval.ms. This should work fine.
>
> For 1, the current proposal is actually intended to use the offsets of the
> messages instead of the file position. The reason is that
> OffsetRequest(ListOffsetRequest) gives back a list of offsets. Having
> offsets instead of file position is more convenient in that case. So far we
> don't have an interface for consumer to directly consume from a given
> timestamp. Supposedly we will have to first return the offset to the
> consumer then the consumer can issue a fetch request. But this does mean
> that we need to look up the index twice if we want to search to a
> particular message using timestamp.
>
> For 2, that is a good point. It looks we have to persist the max timestamp
> of each segment. I am thinking of reserving the first time index entry in
> each time index file. When broker shuts down or rolls out a new segment, we
> persist the max timestamp in the segment by writing a time index entry to
> the first entry. During timestamp search we will ignore the first time
> index entry. So the log retention will always be able to know for sure if a
> log segment is supposed to be deleted or not by looking at the first entry
> of time index.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Tue, Mar 1, 2016 at 4:30 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Jiangjie,
>>
>> I was thinking perhaps just reusing index.interval.bytes is enough. Not
>> sure if there is much value in adding an additional
>> time.index.interval.ms.
>>
>> For 1, the timestamp index has entries of timestamp -> file position. So,
>> there is actually no offset in the index, right?
>>
>> For 2, what you said makes sense for time-based retention. Does that apply
>> if the retention is trigged by size? The difference here is that we can't
>> assume all segments with messages of timestamp smaller than the latest
>> timestamp will be deleted after the message with the latest timestamp is
>> deleted.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Mar 1, 2016 at 1:00 PM, Becket Qin <becket....@gmail.com> wrote:
>>
>> > Hi Jun,
>> >
>> > Rolling out a new segment when the time index is full sounds good. So
>> both
>> > time index and offset index will be sharing the configuration of max
>> index
>> > size.
>> > If we do that, do you think we still want to reuse
>> index.interval.bytes? If
>> > we don't, the risk is that in some corner cases, we might end up with
>> many
>> > small segments. (e.g. small time.index.interval.ms with small max index
>> > size). But this is probably more of a misconfiguration.
>> >
>> > 2. If the broker is still running when all the segments except the
>> active
>> > segment is deleted, we will have an in memory latest timestamp. So that
>> is
>> > not a problem.
>> >
>> > In another case, if a broker boots up and sees only one segment with an
>> > empty time index file, we can scan the active segment and rebuild the
>> time
>> > index.  i.e. we do not need to care about the previous largest timestamp
>> > but simply start over. (We need to scan the active segment because it is
>> > possible that the last message appended to the log has a timestamp not
>> > expired, but the broker died before inserting the time index entry for
>> > it.). If all the messages in the active segment has expired, we should
>> roll
>> > out a new segment and reset the latest timetamp to -1.
>> > The principal here is that we will try to build the time indices for the
>> > existing segments that have not expired. If the message with previously
>> > latest timestamp has already been deleted, there is no need to remember
>> > that any more.
>> >
>> > That said, I believe this corner case is really because user is not
>> > configuring the acceptable time difference threshold appropriately.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On Tue, Mar 1, 2016 at 11:55 AM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Jiangjie,
>> > >
>> > > Currently, we roll a new log segment if the index is full. We can
>> > probably
>> > > just do the same on the time index. This will bound the index size.
>> > >
>> > > 1. Sounds good.
>> > >
>> > > 2. I was wondering an edge case where the largest timestamp is in the
>> > > oldest segment and the time index is empty is in all newer segments.
>> At
>> > > some point, we delete the oldest segment since it has expired. Then,
>> we
>> > > delete all but the active segment. Now, what should the largest
>> timestamp
>> > > be? Should it be the previous largest timestamp that we have seen or
>> > should
>> > > we dig out the largest timestamp in the active segment?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Feb 29, 2016 at 7:29 PM, Becket Qin <becket....@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > I think index.interval.bytes is used to control the density of the
>> > offset
>> > > > index. The counterpart of index.interval.bytes for time index is
>> > > > time.index.interval.ms. If we did not change the semantic of
>> > log.roll.ms
>> > > ,
>> > > > log.roll.ms/time.index.interval.ms and
>> > > > log.segment.bytes/index.interval.bytes are a perfect mapping from
>> bytes
>> > > to
>> > > > time. However, because we changed the behavior of log.roll.ms, we
>> need
>> > > to
>> > > > guard against a potentially excessively large time index. We can
>> either
>> > > > reuse index.interval.bytes or introduce time.index.interval.bytes,
>> but
>> > I
>> > > > cannot think of additional usage for time.index.interval.bytes other
>> > than
>> > > > limiting the time index size.
>> > > >
>> > > > I agree that the memory mapped file is probably not a big issue here
>> > and
>> > > we
>> > > > can change the default index size to 2MB.
>> > > >
>> > > > For the two cases you mentioned.
>> > > > 1. Because the message offset in the time index is also
>> monotonically
>> > > > increasing, truncating should be straightforward. i.e. only keep the
>> > > > entries that are pointing to the offsets earlier than the truncated
>> to
>> > > > offsets.
>> > > >
>> > > > 2. The current assumption is that if the time index of a segment is
>> > empty
>> > > > and there are no previous time index entry, we will assume that
>> segment
>> > > > should be removed - because all the older segment with even larger
>> > > > timestamp have been removed. So in the case you mentioned, during
>> > startup
>> > > > we will remove all the segments and roll out a new empty segment.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Feb 29, 2016 at 6:09 PM, Jun Rao <j...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Becket,
>> > > > >
>> > > > > I thought that your proposal to build time-based index just based
>> off
>> > > > > index.interval.bytes
>> > > > > is reasonable. Is there a particular need to also add time.
>> > > > > index.interval.bytes?
>> > > > >
>> > > > > Compute the pre-allocated index file size based on log segment
>> file
>> > > size
>> > > > > can be useful. However, the tricky thing is that log segment size
>> can
>> > > be
>> > > > > changed dynamically. Also, for mmap files, they don't use heap
>> space,
>> > > > just
>> > > > > virtual memory, which will be paged in on demand. So, I am not
>> sure
>> > if
>> > > > > memory space is a big concern there. The simplest thing is
>> probably
>> > to
>> > > > > change the default index size to 2MB to match the default log
>> segment
>> > > > size.
>> > > > >
>> > > > > A couple of other things to think through.
>> > > > >
>> > > > > 1. Currently, LogSegment supports truncating to an offset. How do
>> we
>> > do
>> > > > > that on a time-based index?
>> > > > >
>> > > > > 2. Since it's possible to have a empty time-based index (if all
>> > message
>> > > > > timestamps are smaller than the largest timestamp in previous
>> > segment),
>> > > > we
>> > > > > need to figure out what timestamp to use for retaining such log
>> > > segment.
>> > > > In
>> > > > > the extreme case, it can happen that after we delete an old log
>> > > segment,
>> > > > > all of the new log segments have an empty time-based index, in
>> this
>> > > case,
>> > > > > how do we avoid losing track of the latest timestamp?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Sun, Feb 28, 2016 at 3:26 PM, Becket Qin <becket....@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > The size of memory mapped index file was also our concern as
>> well.
>> > > That
>> > > > > is
>> > > > > > why we are suggesting minute level time indexing instead of
>> second
>> > > > level.
>> > > > > > There are a few thoughts on the extra memory cost of time index.
>> > > > > >
>> > > > > > 1. Currently all the index files are loaded as memory mapped
>> files.
>> > > > > Notice
>> > > > > > that only the index of the active segment is of the default size
>> > > 10MB.
>> > > > > > Typically the index of the old segments are much smaller than
>> 10MB.
>> > > So
>> > > > if
>> > > > > > we use the same initial size for time index files, the total
>> amount
>> > > of
>> > > > > > memory won't be doubled, but the memory cost of active segments
>> > will
>> > > be
>> > > > > > doubled. (However, the 10MB value itself seems problematic, see
>> > later
>> > > > > > reasoning).
>> > > > > >
>> > > > > > 2. It is likely that the time index is much smaller than the
>> offset
>> > > > index
>> > > > > > because user would adjust the time index interval ms depending
>> on
>> > the
>> > > > > topic
>> > > > > > volume. i.e for a low volume topic the time index interval ms
>> will
>> > be
>> > > > > much
>> > > > > > longer so that we can avoid inserting one time index entry for
>> each
>> > > > > message
>> > > > > > in the extreme case.
>> > > > > >
>> > > > > > 3. To further guard against the unnecessary frequent insertion
>> of
>> > > time
>> > > > > > index entry, we used the index.interval.bytes as a restriction
>> for
>> > > time
>> > > > > > index entry as well. Such that even for a newly created topic
>> with
>> > > the
>> > > > > > default time.index.interval.ms we don't need to worry about
>> overly
>> > > > > > aggressive time index entry insertion.
>> > > > > >
>> > > > > > Considering the above. The overall memory cost for time index
>> > should
>> > > be
>> > > > > > much smaller compared with the offset index. However, as you
>> > pointed
>> > > > out
>> > > > > > for (1) might still be an issue. I am actually not sure about
>> why
>> > we
>> > > > > always
>> > > > > > allocate 10 MB for the index file. This itself looks a problem
>> > given
>> > > we
>> > > > > > actually have a pretty good way to know the upper bound of
>> memory
>> > > taken
>> > > > > by
>> > > > > > an offset index.
>> > > > > >
>> > > > > > Theoretically, the offset index file will at most have
>> > > > > (log.segment.bytes /
>> > > > > > index.interval.bytes) entries. In our default configuration,
>> > > > > > log.segment.size=1GB, and index.interval.bytes=4K. This means we
>> > only
>> > > > > need
>> > > > > > (1GB/4K)*8 Bytes = 2MB. Allocating 10 MB is really a big waste
>> of
>> > > > memory.
>> > > > > >
>> > > > > > I suggest we do the following:
>> > > > > > 1. When creating the log index file, we always allocate memory
>> > using
>> > > > the
>> > > > > > above calculation.
>> > > > > > 2. If the memory calculated in (1) is greater than
>> > > segment.index.bytes,
>> > > > > we
>> > > > > > use segment.index.bytes instead. Otherwise we simply use the
>> result
>> > > in
>> > > > > (1)
>> > > > > >
>> > > > > > If we do this I believe the memory for index file will probably
>> be
>> > > > > smaller
>> > > > > > even if we have the time index added. I will create a separate
>> > ticket
>> > > > for
>> > > > > > the index file initial size.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jiangjie (Becket) Qin
>> > > > > >
>> > > > > > On Thu, Feb 25, 2016 at 3:30 PM, Guozhang Wang <
>> wangg...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Jiangjie,
>> > > > > > >
>> > > > > > > I was originally only thinking about the
>> > > "time.index.size.max.bytes"
>> > > > > > config
>> > > > > > > in addition to the "offset.index.size.max.bytes". Since the
>> > > latter's
>> > > > > > > default size is 10MB, and for memory mapped file, we will
>> > allocate
>> > > > that
>> > > > > > > much of memory at the start which could be a pressure on RAM
>> if
>> > we
>> > > > > double
>> > > > > > > it.
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > > On Wed, Feb 24, 2016 at 4:56 PM, Becket Qin <
>> > becket....@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Guozhang,
>> > > > > > > >
>> > > > > > > > I thought about this again and it seems we stilll need the
>> > > > > > > > time.index.interval.ms configuration to avoid unnecessary
>> > > frequent
>> > > > > > time
>> > > > > > > > index insertion.
>> > > > > > > >
>> > > > > > > > I just updated the wiki to add index.interval.bytes as an
>> > > > additional
>> > > > > > > > constraints for time index entry insertion. Another slight
>> > change
>> > > > > made
>> > > > > > > was
>> > > > > > > > that as long as a message timestamp shows
>> > time.index.interval.ms
>> > > > has
>> > > > > > > > passed
>> > > > > > > > since the timestamp of last time index entry, we will insert
>> > > > another
>> > > > > > > > timestmap index entry. Previously we always insert time
>> index
>> > at
>> > > > > > > > time.index.interval.ms bucket boundaries.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > >
>> > > > > > > > Jiangjie (Becket) Qin
>> > > > > > > >
>> > > > > > > > On Wed, Feb 24, 2016 at 2:40 PM, Becket Qin <
>> > > becket....@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Thanks for the comment Guozhang,
>> > > > > > > > >
>> > > > > > > > > I just changed the configuration name to "
>> > > time.index.interval.ms
>> > > > ".
>> > > > > > > > >
>> > > > > > > > > It seems the real question here is how big the offset
>> indices
>> > > > will
>> > > > > > be.
>> > > > > > > > > Theoretically we can have one time index entry for each
>> > message
>> > > > in
>> > > > > a
>> > > > > > > log
>> > > > > > > > > segment. For example, if there is one event per minute
>> > > appended,
>> > > > we
>> > > > > > > might
>> > > > > > > > > have to have a time index entry for each message until the
>> > > > segment
>> > > > > > size
>> > > > > > > > is
>> > > > > > > > > reached. In that case, the number of index entries in the
>> > time
>> > > > > index
>> > > > > > > > would
>> > > > > > > > > be (segment size / avg message size). So the time index
>> file
>> > > size
>> > > > > can
>> > > > > > > > > potentially be big.
>> > > > > > > > >
>> > > > > > > > > I am wondering if we can simply reuse the
>> > > "index.interval.bytes"
>> > > > > > > > > configuration instead of having a separate time index
>> > interval
>> > > > ms.
>> > > > > > i.e.
>> > > > > > > > > instead of inserting a new entry based on time interval,
>> we
>> > > still
>> > > > > > > insert
>> > > > > > > > it
>> > > > > > > > > based on bytes interval. This does not affect the
>> granularity
>> > > > > because
>> > > > > > > we
>> > > > > > > > > can search from the nearest index entry to find the
>> message
>> > > with
>> > > > > > > correct
>> > > > > > > > > timestamp. The good thing is that this guarantees there
>> will
>> > > not
>> > > > be
>> > > > > > > huge
>> > > > > > > > > time indices. We also save the new configuration.
>> > > > > > > > >
>> > > > > > > > > What do you think?
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jiangjie (Becket) Qin
>> > > > > > > > >
>> > > > > > > > > On Wed, Feb 24, 2016 at 1:00 PM, Guozhang Wang <
>> > > > wangg...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > >> Thanks Jiangjie, a few comments on the wiki:
>> > > > > > > > >>
>> > > > > > > > >> 1. Config name "time.index.interval" to "
>> > > time.index.interval.ms
>> > > > "
>> > > > > to
>> > > > > > > be
>> > > > > > > > >> consistent. Also do we need a
>> "time.index.size.max.bytes" as
>> > > > well?
>> > > > > > > > >>
>> > > > > > > > >> 2. Will the memory mapped index file for timestamp have
>> the
>> > > same
>> > > > > > > default
>> > > > > > > > >> initial / max size (10485760) as the offset index?
>> > > > > > > > >>
>> > > > > > > > >> Otherwise LGTM.
>> > > > > > > > >>
>> > > > > > > > >> Guozhang
>> > > > > > > > >>
>> > > > > > > > >> On Tue, Feb 23, 2016 at 5:05 PM, Becket Qin <
>> > > > becket....@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >>
>> > > > > > > > >> > Bump.
>> > > > > > > > >> >
>> > > > > > > > >> > Per Jun's comments during KIP hangout, I have updated
>> wiki
>> > > > with
>> > > > > > the
>> > > > > > > > >> upgrade
>> > > > > > > > >> > plan or KIP-33.
>> > > > > > > > >> >
>> > > > > > > > >> > Let's vote!
>> > > > > > > > >> >
>> > > > > > > > >> > Thanks,
>> > > > > > > > >> >
>> > > > > > > > >> > Jiangjie (Becket) Qin
>> > > > > > > > >> >
>> > > > > > > > >> > On Wed, Feb 3, 2016 at 10:32 AM, Becket Qin <
>> > > > > becket....@gmail.com
>> > > > > > >
>> > > > > > > > >> wrote:
>> > > > > > > > >> >
>> > > > > > > > >> > > Hi all,
>> > > > > > > > >> > >
>> > > > > > > > >> > > I would like to initiate the vote for KIP-33.
>> > > > > > > > >> > >
>> > > > > > > > >> > >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33
>> > > > > > > > >> > > +-+Add+a+time+based+log+index
>> > > > > > > > >> > >
>> > > > > > > > >> > > A good amount of the KIP has been touched during the
>> > > > > discussion
>> > > > > > on
>> > > > > > > > >> > KIP-32.
>> > > > > > > > >> > > So I also put the link to KIP-32 here for reference.
>> > > > > > > > >> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP
>> > > > > > > > >> > > -32+-+Add+timestamps+to+Kafka+message
>> > > > > > > > >> > >
>> > > > > > > > >> > > Thanks,
>> > > > > > > > >> > >
>> > > > > > > > >> > > Jiangjie (Becket) Qin
>> > > > > > > > >> > >
>> > > > > > > > >> >
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >>
>> > > > > > > > >> --
>> > > > > > > > >> -- Guozhang
>> > > > > > > > >>
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to