I updated the wiki to make the following change: Instead of maintaining a
globally monotonically increasing time index. We only make sure the time
index for each log segment is monotonically increasing.

By doing that everything seems much simpler. we avoid empty time index.
Enforcing log retention becomes easier. And searching by timestamp is not
complicated.

Thanks,

Jiangjie (Becket) Qin

On Mon, Mar 7, 2016 at 2:52 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jun,
>
> What do you think about the above solution? I am trying to include KIP-33
> into 0.10.0 because the log retention has been a long pending issue.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 1, 2016 at 8:18 PM, Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Jun,
>>
>> I see. If we only use index.interval.bytes, the time index entry will be
>> inserted when (1) the largest timestamp is in this segment AND (2) at least
>> index.interval.bytes have been appended since last time index entry
>> insertion.
>> In this case (1) becomes implicit instead of having an explicit threshold
>> of time.index.interval.ms. This should work fine.
>>
>> For 1, the current proposal is actually intended to use the offsets of
>> the messages instead of the file position. The reason is that
>> OffsetRequest(ListOffsetRequest) gives back a list of offsets. Having
>> offsets instead of file position is more convenient in that case. So far we
>> don't have an interface for consumer to directly consume from a given
>> timestamp. Supposedly we will have to first return the offset to the
>> consumer then the consumer can issue a fetch request. But this does mean
>> that we need to look up the index twice if we want to search to a
>> particular message using timestamp.
>>
>> For 2, that is a good point. It looks we have to persist the max
>> timestamp of each segment. I am thinking of reserving the first time index
>> entry in each time index file. When broker shuts down or rolls out a new
>> segment, we persist the max timestamp in the segment by writing a time
>> index entry to the first entry. During timestamp search we will ignore the
>> first time index entry. So the log retention will always be able to know
>> for sure if a log segment is supposed to be deleted or not by looking at
>> the first entry of time index.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>>
>> On Tue, Mar 1, 2016 at 4:30 PM, Jun Rao <j...@confluent.io> wrote:
>>
>>> Hi, Jiangjie,
>>>
>>> I was thinking perhaps just reusing index.interval.bytes is enough. Not
>>> sure if there is much value in adding an additional
>>> time.index.interval.ms.
>>>
>>> For 1, the timestamp index has entries of timestamp -> file position. So,
>>> there is actually no offset in the index, right?
>>>
>>> For 2, what you said makes sense for time-based retention. Does that
>>> apply
>>> if the retention is trigged by size? The difference here is that we can't
>>> assume all segments with messages of timestamp smaller than the latest
>>> timestamp will be deleted after the message with the latest timestamp is
>>> deleted.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Mar 1, 2016 at 1:00 PM, Becket Qin <becket....@gmail.com> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Rolling out a new segment when the time index is full sounds good. So
>>> both
>>> > time index and offset index will be sharing the configuration of max
>>> index
>>> > size.
>>> > If we do that, do you think we still want to reuse
>>> index.interval.bytes? If
>>> > we don't, the risk is that in some corner cases, we might end up with
>>> many
>>> > small segments. (e.g. small time.index.interval.ms with small max
>>> index
>>> > size). But this is probably more of a misconfiguration.
>>> >
>>> > 2. If the broker is still running when all the segments except the
>>> active
>>> > segment is deleted, we will have an in memory latest timestamp. So
>>> that is
>>> > not a problem.
>>> >
>>> > In another case, if a broker boots up and sees only one segment with an
>>> > empty time index file, we can scan the active segment and rebuild the
>>> time
>>> > index.  i.e. we do not need to care about the previous largest
>>> timestamp
>>> > but simply start over. (We need to scan the active segment because it
>>> is
>>> > possible that the last message appended to the log has a timestamp not
>>> > expired, but the broker died before inserting the time index entry for
>>> > it.). If all the messages in the active segment has expired, we should
>>> roll
>>> > out a new segment and reset the latest timetamp to -1.
>>> > The principal here is that we will try to build the time indices for
>>> the
>>> > existing segments that have not expired. If the message with previously
>>> > latest timestamp has already been deleted, there is no need to remember
>>> > that any more.
>>> >
>>> > That said, I believe this corner case is really because user is not
>>> > configuring the acceptable time difference threshold appropriately.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> >
>>> > On Tue, Mar 1, 2016 at 11:55 AM, Jun Rao <j...@confluent.io> wrote:
>>> >
>>> > > Jiangjie,
>>> > >
>>> > > Currently, we roll a new log segment if the index is full. We can
>>> > probably
>>> > > just do the same on the time index. This will bound the index size.
>>> > >
>>> > > 1. Sounds good.
>>> > >
>>> > > 2. I was wondering an edge case where the largest timestamp is in the
>>> > > oldest segment and the time index is empty is in all newer segments.
>>> At
>>> > > some point, we delete the oldest segment since it has expired. Then,
>>> we
>>> > > delete all but the active segment. Now, what should the largest
>>> timestamp
>>> > > be? Should it be the previous largest timestamp that we have seen or
>>> > should
>>> > > we dig out the largest timestamp in the active segment?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jun
>>> > >
>>> > >
>>> > > On Mon, Feb 29, 2016 at 7:29 PM, Becket Qin <becket....@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Hi Jun,
>>> > > >
>>> > > > I think index.interval.bytes is used to control the density of the
>>> > offset
>>> > > > index. The counterpart of index.interval.bytes for time index is
>>> > > > time.index.interval.ms. If we did not change the semantic of
>>> > log.roll.ms
>>> > > ,
>>> > > > log.roll.ms/time.index.interval.ms and
>>> > > > log.segment.bytes/index.interval.bytes are a perfect mapping from
>>> bytes
>>> > > to
>>> > > > time. However, because we changed the behavior of log.roll.ms, we
>>> need
>>> > > to
>>> > > > guard against a potentially excessively large time index. We can
>>> either
>>> > > > reuse index.interval.bytes or introduce time.index.interval.bytes,
>>> but
>>> > I
>>> > > > cannot think of additional usage for time.index.interval.bytes
>>> other
>>> > than
>>> > > > limiting the time index size.
>>> > > >
>>> > > > I agree that the memory mapped file is probably not a big issue
>>> here
>>> > and
>>> > > we
>>> > > > can change the default index size to 2MB.
>>> > > >
>>> > > > For the two cases you mentioned.
>>> > > > 1. Because the message offset in the time index is also
>>> monotonically
>>> > > > increasing, truncating should be straightforward. i.e. only keep
>>> the
>>> > > > entries that are pointing to the offsets earlier than the
>>> truncated to
>>> > > > offsets.
>>> > > >
>>> > > > 2. The current assumption is that if the time index of a segment is
>>> > empty
>>> > > > and there are no previous time index entry, we will assume that
>>> segment
>>> > > > should be removed - because all the older segment with even larger
>>> > > > timestamp have been removed. So in the case you mentioned, during
>>> > startup
>>> > > > we will remove all the segments and roll out a new empty segment.
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jiangjie (Becket) Qin
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Mon, Feb 29, 2016 at 6:09 PM, Jun Rao <j...@confluent.io> wrote:
>>> > > >
>>> > > > > Hi, Becket,
>>> > > > >
>>> > > > > I thought that your proposal to build time-based index just
>>> based off
>>> > > > > index.interval.bytes
>>> > > > > is reasonable. Is there a particular need to also add time.
>>> > > > > index.interval.bytes?
>>> > > > >
>>> > > > > Compute the pre-allocated index file size based on log segment
>>> file
>>> > > size
>>> > > > > can be useful. However, the tricky thing is that log segment
>>> size can
>>> > > be
>>> > > > > changed dynamically. Also, for mmap files, they don't use heap
>>> space,
>>> > > > just
>>> > > > > virtual memory, which will be paged in on demand. So, I am not
>>> sure
>>> > if
>>> > > > > memory space is a big concern there. The simplest thing is
>>> probably
>>> > to
>>> > > > > change the default index size to 2MB to match the default log
>>> segment
>>> > > > size.
>>> > > > >
>>> > > > > A couple of other things to think through.
>>> > > > >
>>> > > > > 1. Currently, LogSegment supports truncating to an offset. How
>>> do we
>>> > do
>>> > > > > that on a time-based index?
>>> > > > >
>>> > > > > 2. Since it's possible to have a empty time-based index (if all
>>> > message
>>> > > > > timestamps are smaller than the largest timestamp in previous
>>> > segment),
>>> > > > we
>>> > > > > need to figure out what timestamp to use for retaining such log
>>> > > segment.
>>> > > > In
>>> > > > > the extreme case, it can happen that after we delete an old log
>>> > > segment,
>>> > > > > all of the new log segments have an empty time-based index, in
>>> this
>>> > > case,
>>> > > > > how do we avoid losing track of the latest timestamp?
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jun
>>> > > > >
>>> > > > > On Sun, Feb 28, 2016 at 3:26 PM, Becket Qin <
>>> becket....@gmail.com>
>>> > > > wrote:
>>> > > > >
>>> > > > > > Hi Guozhang,
>>> > > > > >
>>> > > > > > The size of memory mapped index file was also our concern as
>>> well.
>>> > > That
>>> > > > > is
>>> > > > > > why we are suggesting minute level time indexing instead of
>>> second
>>> > > > level.
>>> > > > > > There are a few thoughts on the extra memory cost of time
>>> index.
>>> > > > > >
>>> > > > > > 1. Currently all the index files are loaded as memory mapped
>>> files.
>>> > > > > Notice
>>> > > > > > that only the index of the active segment is of the default
>>> size
>>> > > 10MB.
>>> > > > > > Typically the index of the old segments are much smaller than
>>> 10MB.
>>> > > So
>>> > > > if
>>> > > > > > we use the same initial size for time index files, the total
>>> amount
>>> > > of
>>> > > > > > memory won't be doubled, but the memory cost of active segments
>>> > will
>>> > > be
>>> > > > > > doubled. (However, the 10MB value itself seems problematic, see
>>> > later
>>> > > > > > reasoning).
>>> > > > > >
>>> > > > > > 2. It is likely that the time index is much smaller than the
>>> offset
>>> > > > index
>>> > > > > > because user would adjust the time index interval ms depending
>>> on
>>> > the
>>> > > > > topic
>>> > > > > > volume. i.e for a low volume topic the time index interval ms
>>> will
>>> > be
>>> > > > > much
>>> > > > > > longer so that we can avoid inserting one time index entry for
>>> each
>>> > > > > message
>>> > > > > > in the extreme case.
>>> > > > > >
>>> > > > > > 3. To further guard against the unnecessary frequent insertion
>>> of
>>> > > time
>>> > > > > > index entry, we used the index.interval.bytes as a restriction
>>> for
>>> > > time
>>> > > > > > index entry as well. Such that even for a newly created topic
>>> with
>>> > > the
>>> > > > > > default time.index.interval.ms we don't need to worry about
>>> overly
>>> > > > > > aggressive time index entry insertion.
>>> > > > > >
>>> > > > > > Considering the above. The overall memory cost for time index
>>> > should
>>> > > be
>>> > > > > > much smaller compared with the offset index. However, as you
>>> > pointed
>>> > > > out
>>> > > > > > for (1) might still be an issue. I am actually not sure about
>>> why
>>> > we
>>> > > > > always
>>> > > > > > allocate 10 MB for the index file. This itself looks a problem
>>> > given
>>> > > we
>>> > > > > > actually have a pretty good way to know the upper bound of
>>> memory
>>> > > taken
>>> > > > > by
>>> > > > > > an offset index.
>>> > > > > >
>>> > > > > > Theoretically, the offset index file will at most have
>>> > > > > (log.segment.bytes /
>>> > > > > > index.interval.bytes) entries. In our default configuration,
>>> > > > > > log.segment.size=1GB, and index.interval.bytes=4K. This means
>>> we
>>> > only
>>> > > > > need
>>> > > > > > (1GB/4K)*8 Bytes = 2MB. Allocating 10 MB is really a big waste
>>> of
>>> > > > memory.
>>> > > > > >
>>> > > > > > I suggest we do the following:
>>> > > > > > 1. When creating the log index file, we always allocate memory
>>> > using
>>> > > > the
>>> > > > > > above calculation.
>>> > > > > > 2. If the memory calculated in (1) is greater than
>>> > > segment.index.bytes,
>>> > > > > we
>>> > > > > > use segment.index.bytes instead. Otherwise we simply use the
>>> result
>>> > > in
>>> > > > > (1)
>>> > > > > >
>>> > > > > > If we do this I believe the memory for index file will
>>> probably be
>>> > > > > smaller
>>> > > > > > even if we have the time index added. I will create a separate
>>> > ticket
>>> > > > for
>>> > > > > > the index file initial size.
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > >
>>> > > > > > Jiangjie (Becket) Qin
>>> > > > > >
>>> > > > > > On Thu, Feb 25, 2016 at 3:30 PM, Guozhang Wang <
>>> wangg...@gmail.com
>>> > >
>>> > > > > wrote:
>>> > > > > >
>>> > > > > > > Jiangjie,
>>> > > > > > >
>>> > > > > > > I was originally only thinking about the
>>> > > "time.index.size.max.bytes"
>>> > > > > > config
>>> > > > > > > in addition to the "offset.index.size.max.bytes". Since the
>>> > > latter's
>>> > > > > > > default size is 10MB, and for memory mapped file, we will
>>> > allocate
>>> > > > that
>>> > > > > > > much of memory at the start which could be a pressure on RAM
>>> if
>>> > we
>>> > > > > double
>>> > > > > > > it.
>>> > > > > > >
>>> > > > > > > Guozhang
>>> > > > > > >
>>> > > > > > > On Wed, Feb 24, 2016 at 4:56 PM, Becket Qin <
>>> > becket....@gmail.com>
>>> > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Hi Guozhang,
>>> > > > > > > >
>>> > > > > > > > I thought about this again and it seems we stilll need the
>>> > > > > > > > time.index.interval.ms configuration to avoid unnecessary
>>> > > frequent
>>> > > > > > time
>>> > > > > > > > index insertion.
>>> > > > > > > >
>>> > > > > > > > I just updated the wiki to add index.interval.bytes as an
>>> > > > additional
>>> > > > > > > > constraints for time index entry insertion. Another slight
>>> > change
>>> > > > > made
>>> > > > > > > was
>>> > > > > > > > that as long as a message timestamp shows
>>> > time.index.interval.ms
>>> > > > has
>>> > > > > > > > passed
>>> > > > > > > > since the timestamp of last time index entry, we will
>>> insert
>>> > > > another
>>> > > > > > > > timestmap index entry. Previously we always insert time
>>> index
>>> > at
>>> > > > > > > > time.index.interval.ms bucket boundaries.
>>> > > > > > > >
>>> > > > > > > > Thanks,
>>> > > > > > > >
>>> > > > > > > > Jiangjie (Becket) Qin
>>> > > > > > > >
>>> > > > > > > > On Wed, Feb 24, 2016 at 2:40 PM, Becket Qin <
>>> > > becket....@gmail.com>
>>> > > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > > > Thanks for the comment Guozhang,
>>> > > > > > > > >
>>> > > > > > > > > I just changed the configuration name to "
>>> > > time.index.interval.ms
>>> > > > ".
>>> > > > > > > > >
>>> > > > > > > > > It seems the real question here is how big the offset
>>> indices
>>> > > > will
>>> > > > > > be.
>>> > > > > > > > > Theoretically we can have one time index entry for each
>>> > message
>>> > > > in
>>> > > > > a
>>> > > > > > > log
>>> > > > > > > > > segment. For example, if there is one event per minute
>>> > > appended,
>>> > > > we
>>> > > > > > > might
>>> > > > > > > > > have to have a time index entry for each message until
>>> the
>>> > > > segment
>>> > > > > > size
>>> > > > > > > > is
>>> > > > > > > > > reached. In that case, the number of index entries in the
>>> > time
>>> > > > > index
>>> > > > > > > > would
>>> > > > > > > > > be (segment size / avg message size). So the time index
>>> file
>>> > > size
>>> > > > > can
>>> > > > > > > > > potentially be big.
>>> > > > > > > > >
>>> > > > > > > > > I am wondering if we can simply reuse the
>>> > > "index.interval.bytes"
>>> > > > > > > > > configuration instead of having a separate time index
>>> > interval
>>> > > > ms.
>>> > > > > > i.e.
>>> > > > > > > > > instead of inserting a new entry based on time interval,
>>> we
>>> > > still
>>> > > > > > > insert
>>> > > > > > > > it
>>> > > > > > > > > based on bytes interval. This does not affect the
>>> granularity
>>> > > > > because
>>> > > > > > > we
>>> > > > > > > > > can search from the nearest index entry to find the
>>> message
>>> > > with
>>> > > > > > > correct
>>> > > > > > > > > timestamp. The good thing is that this guarantees there
>>> will
>>> > > not
>>> > > > be
>>> > > > > > > huge
>>> > > > > > > > > time indices. We also save the new configuration.
>>> > > > > > > > >
>>> > > > > > > > > What do you think?
>>> > > > > > > > >
>>> > > > > > > > > Thanks,
>>> > > > > > > > >
>>> > > > > > > > > Jiangjie (Becket) Qin
>>> > > > > > > > >
>>> > > > > > > > > On Wed, Feb 24, 2016 at 1:00 PM, Guozhang Wang <
>>> > > > wangg...@gmail.com
>>> > > > > >
>>> > > > > > > > wrote:
>>> > > > > > > > >
>>> > > > > > > > >> Thanks Jiangjie, a few comments on the wiki:
>>> > > > > > > > >>
>>> > > > > > > > >> 1. Config name "time.index.interval" to "
>>> > > time.index.interval.ms
>>> > > > "
>>> > > > > to
>>> > > > > > > be
>>> > > > > > > > >> consistent. Also do we need a
>>> "time.index.size.max.bytes" as
>>> > > > well?
>>> > > > > > > > >>
>>> > > > > > > > >> 2. Will the memory mapped index file for timestamp have
>>> the
>>> > > same
>>> > > > > > > default
>>> > > > > > > > >> initial / max size (10485760) as the offset index?
>>> > > > > > > > >>
>>> > > > > > > > >> Otherwise LGTM.
>>> > > > > > > > >>
>>> > > > > > > > >> Guozhang
>>> > > > > > > > >>
>>> > > > > > > > >> On Tue, Feb 23, 2016 at 5:05 PM, Becket Qin <
>>> > > > becket....@gmail.com
>>> > > > > >
>>> > > > > > > > wrote:
>>> > > > > > > > >>
>>> > > > > > > > >> > Bump.
>>> > > > > > > > >> >
>>> > > > > > > > >> > Per Jun's comments during KIP hangout, I have updated
>>> wiki
>>> > > > with
>>> > > > > > the
>>> > > > > > > > >> upgrade
>>> > > > > > > > >> > plan or KIP-33.
>>> > > > > > > > >> >
>>> > > > > > > > >> > Let's vote!
>>> > > > > > > > >> >
>>> > > > > > > > >> > Thanks,
>>> > > > > > > > >> >
>>> > > > > > > > >> > Jiangjie (Becket) Qin
>>> > > > > > > > >> >
>>> > > > > > > > >> > On Wed, Feb 3, 2016 at 10:32 AM, Becket Qin <
>>> > > > > becket....@gmail.com
>>> > > > > > >
>>> > > > > > > > >> wrote:
>>> > > > > > > > >> >
>>> > > > > > > > >> > > Hi all,
>>> > > > > > > > >> > >
>>> > > > > > > > >> > > I would like to initiate the vote for KIP-33.
>>> > > > > > > > >> > >
>>> > > > > > > > >> > >
>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33
>>> > > > > > > > >> > > +-+Add+a+time+based+log+index
>>> > > > > > > > >> > >
>>> > > > > > > > >> > > A good amount of the KIP has been touched during the
>>> > > > > discussion
>>> > > > > > on
>>> > > > > > > > >> > KIP-32.
>>> > > > > > > > >> > > So I also put the link to KIP-32 here for reference.
>>> > > > > > > > >> > >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP
>>> > > > > > > > >> > > -32+-+Add+timestamps+to+Kafka+message
>>> > > > > > > > >> > >
>>> > > > > > > > >> > > Thanks,
>>> > > > > > > > >> > >
>>> > > > > > > > >> > > Jiangjie (Becket) Qin
>>> > > > > > > > >> > >
>>> > > > > > > > >> >
>>> > > > > > > > >>
>>> > > > > > > > >>
>>> > > > > > > > >>
>>> > > > > > > > >> --
>>> > > > > > > > >> -- Guozhang
>>> > > > > > > > >>
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > --
>>> > > > > > > -- Guozhang
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to