Hi Jun, What do you think about the above solution? I am trying to include KIP-33 into 0.10.0 because the log retention has been a long pending issue.
Thanks, Jiangjie (Becket) Qin On Tue, Mar 1, 2016 at 8:18 PM, Becket Qin <becket....@gmail.com> wrote: > Hi Jun, > > I see. If we only use index.interval.bytes, the time index entry will be > inserted when (1) the largest timestamp is in this segment AND (2) at least > index.interval.bytes have been appended since last time index entry > insertion. > In this case (1) becomes implicit instead of having an explicit threshold > of time.index.interval.ms. This should work fine. > > For 1, the current proposal is actually intended to use the offsets of the > messages instead of the file position. The reason is that > OffsetRequest(ListOffsetRequest) gives back a list of offsets. Having > offsets instead of file position is more convenient in that case. So far we > don't have an interface for consumer to directly consume from a given > timestamp. Supposedly we will have to first return the offset to the > consumer then the consumer can issue a fetch request. But this does mean > that we need to look up the index twice if we want to search to a > particular message using timestamp. > > For 2, that is a good point. It looks we have to persist the max timestamp > of each segment. I am thinking of reserving the first time index entry in > each time index file. When broker shuts down or rolls out a new segment, we > persist the max timestamp in the segment by writing a time index entry to > the first entry. During timestamp search we will ignore the first time > index entry. So the log retention will always be able to know for sure if a > log segment is supposed to be deleted or not by looking at the first entry > of time index. > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Tue, Mar 1, 2016 at 4:30 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Jiangjie, >> >> I was thinking perhaps just reusing index.interval.bytes is enough. Not >> sure if there is much value in adding an additional >> time.index.interval.ms. >> >> For 1, the timestamp index has entries of timestamp -> file position. So, >> there is actually no offset in the index, right? >> >> For 2, what you said makes sense for time-based retention. Does that apply >> if the retention is trigged by size? The difference here is that we can't >> assume all segments with messages of timestamp smaller than the latest >> timestamp will be deleted after the message with the latest timestamp is >> deleted. >> >> Thanks, >> >> Jun >> >> On Tue, Mar 1, 2016 at 1:00 PM, Becket Qin <becket....@gmail.com> wrote: >> >> > Hi Jun, >> > >> > Rolling out a new segment when the time index is full sounds good. So >> both >> > time index and offset index will be sharing the configuration of max >> index >> > size. >> > If we do that, do you think we still want to reuse >> index.interval.bytes? If >> > we don't, the risk is that in some corner cases, we might end up with >> many >> > small segments. (e.g. small time.index.interval.ms with small max index >> > size). But this is probably more of a misconfiguration. >> > >> > 2. If the broker is still running when all the segments except the >> active >> > segment is deleted, we will have an in memory latest timestamp. So that >> is >> > not a problem. >> > >> > In another case, if a broker boots up and sees only one segment with an >> > empty time index file, we can scan the active segment and rebuild the >> time >> > index. i.e. we do not need to care about the previous largest timestamp >> > but simply start over. (We need to scan the active segment because it is >> > possible that the last message appended to the log has a timestamp not >> > expired, but the broker died before inserting the time index entry for >> > it.). If all the messages in the active segment has expired, we should >> roll >> > out a new segment and reset the latest timetamp to -1. >> > The principal here is that we will try to build the time indices for the >> > existing segments that have not expired. If the message with previously >> > latest timestamp has already been deleted, there is no need to remember >> > that any more. >> > >> > That said, I believe this corner case is really because user is not >> > configuring the acceptable time difference threshold appropriately. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > On Tue, Mar 1, 2016 at 11:55 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Jiangjie, >> > > >> > > Currently, we roll a new log segment if the index is full. We can >> > probably >> > > just do the same on the time index. This will bound the index size. >> > > >> > > 1. Sounds good. >> > > >> > > 2. I was wondering an edge case where the largest timestamp is in the >> > > oldest segment and the time index is empty is in all newer segments. >> At >> > > some point, we delete the oldest segment since it has expired. Then, >> we >> > > delete all but the active segment. Now, what should the largest >> timestamp >> > > be? Should it be the previous largest timestamp that we have seen or >> > should >> > > we dig out the largest timestamp in the active segment? >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Mon, Feb 29, 2016 at 7:29 PM, Becket Qin <becket....@gmail.com> >> > wrote: >> > > >> > > > Hi Jun, >> > > > >> > > > I think index.interval.bytes is used to control the density of the >> > offset >> > > > index. The counterpart of index.interval.bytes for time index is >> > > > time.index.interval.ms. If we did not change the semantic of >> > log.roll.ms >> > > , >> > > > log.roll.ms/time.index.interval.ms and >> > > > log.segment.bytes/index.interval.bytes are a perfect mapping from >> bytes >> > > to >> > > > time. However, because we changed the behavior of log.roll.ms, we >> need >> > > to >> > > > guard against a potentially excessively large time index. We can >> either >> > > > reuse index.interval.bytes or introduce time.index.interval.bytes, >> but >> > I >> > > > cannot think of additional usage for time.index.interval.bytes other >> > than >> > > > limiting the time index size. >> > > > >> > > > I agree that the memory mapped file is probably not a big issue here >> > and >> > > we >> > > > can change the default index size to 2MB. >> > > > >> > > > For the two cases you mentioned. >> > > > 1. Because the message offset in the time index is also >> monotonically >> > > > increasing, truncating should be straightforward. i.e. only keep the >> > > > entries that are pointing to the offsets earlier than the truncated >> to >> > > > offsets. >> > > > >> > > > 2. The current assumption is that if the time index of a segment is >> > empty >> > > > and there are no previous time index entry, we will assume that >> segment >> > > > should be removed - because all the older segment with even larger >> > > > timestamp have been removed. So in the case you mentioned, during >> > startup >> > > > we will remove all the segments and roll out a new empty segment. >> > > > >> > > > Thanks, >> > > > >> > > > Jiangjie (Becket) Qin >> > > > >> > > > >> > > > >> > > > On Mon, Feb 29, 2016 at 6:09 PM, Jun Rao <j...@confluent.io> wrote: >> > > > >> > > > > Hi, Becket, >> > > > > >> > > > > I thought that your proposal to build time-based index just based >> off >> > > > > index.interval.bytes >> > > > > is reasonable. Is there a particular need to also add time. >> > > > > index.interval.bytes? >> > > > > >> > > > > Compute the pre-allocated index file size based on log segment >> file >> > > size >> > > > > can be useful. However, the tricky thing is that log segment size >> can >> > > be >> > > > > changed dynamically. Also, for mmap files, they don't use heap >> space, >> > > > just >> > > > > virtual memory, which will be paged in on demand. So, I am not >> sure >> > if >> > > > > memory space is a big concern there. The simplest thing is >> probably >> > to >> > > > > change the default index size to 2MB to match the default log >> segment >> > > > size. >> > > > > >> > > > > A couple of other things to think through. >> > > > > >> > > > > 1. Currently, LogSegment supports truncating to an offset. How do >> we >> > do >> > > > > that on a time-based index? >> > > > > >> > > > > 2. Since it's possible to have a empty time-based index (if all >> > message >> > > > > timestamps are smaller than the largest timestamp in previous >> > segment), >> > > > we >> > > > > need to figure out what timestamp to use for retaining such log >> > > segment. >> > > > In >> > > > > the extreme case, it can happen that after we delete an old log >> > > segment, >> > > > > all of the new log segments have an empty time-based index, in >> this >> > > case, >> > > > > how do we avoid losing track of the latest timestamp? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > On Sun, Feb 28, 2016 at 3:26 PM, Becket Qin <becket....@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > Hi Guozhang, >> > > > > > >> > > > > > The size of memory mapped index file was also our concern as >> well. >> > > That >> > > > > is >> > > > > > why we are suggesting minute level time indexing instead of >> second >> > > > level. >> > > > > > There are a few thoughts on the extra memory cost of time index. >> > > > > > >> > > > > > 1. Currently all the index files are loaded as memory mapped >> files. >> > > > > Notice >> > > > > > that only the index of the active segment is of the default size >> > > 10MB. >> > > > > > Typically the index of the old segments are much smaller than >> 10MB. >> > > So >> > > > if >> > > > > > we use the same initial size for time index files, the total >> amount >> > > of >> > > > > > memory won't be doubled, but the memory cost of active segments >> > will >> > > be >> > > > > > doubled. (However, the 10MB value itself seems problematic, see >> > later >> > > > > > reasoning). >> > > > > > >> > > > > > 2. It is likely that the time index is much smaller than the >> offset >> > > > index >> > > > > > because user would adjust the time index interval ms depending >> on >> > the >> > > > > topic >> > > > > > volume. i.e for a low volume topic the time index interval ms >> will >> > be >> > > > > much >> > > > > > longer so that we can avoid inserting one time index entry for >> each >> > > > > message >> > > > > > in the extreme case. >> > > > > > >> > > > > > 3. To further guard against the unnecessary frequent insertion >> of >> > > time >> > > > > > index entry, we used the index.interval.bytes as a restriction >> for >> > > time >> > > > > > index entry as well. Such that even for a newly created topic >> with >> > > the >> > > > > > default time.index.interval.ms we don't need to worry about >> overly >> > > > > > aggressive time index entry insertion. >> > > > > > >> > > > > > Considering the above. The overall memory cost for time index >> > should >> > > be >> > > > > > much smaller compared with the offset index. However, as you >> > pointed >> > > > out >> > > > > > for (1) might still be an issue. I am actually not sure about >> why >> > we >> > > > > always >> > > > > > allocate 10 MB for the index file. This itself looks a problem >> > given >> > > we >> > > > > > actually have a pretty good way to know the upper bound of >> memory >> > > taken >> > > > > by >> > > > > > an offset index. >> > > > > > >> > > > > > Theoretically, the offset index file will at most have >> > > > > (log.segment.bytes / >> > > > > > index.interval.bytes) entries. In our default configuration, >> > > > > > log.segment.size=1GB, and index.interval.bytes=4K. This means we >> > only >> > > > > need >> > > > > > (1GB/4K)*8 Bytes = 2MB. Allocating 10 MB is really a big waste >> of >> > > > memory. >> > > > > > >> > > > > > I suggest we do the following: >> > > > > > 1. When creating the log index file, we always allocate memory >> > using >> > > > the >> > > > > > above calculation. >> > > > > > 2. If the memory calculated in (1) is greater than >> > > segment.index.bytes, >> > > > > we >> > > > > > use segment.index.bytes instead. Otherwise we simply use the >> result >> > > in >> > > > > (1) >> > > > > > >> > > > > > If we do this I believe the memory for index file will probably >> be >> > > > > smaller >> > > > > > even if we have the time index added. I will create a separate >> > ticket >> > > > for >> > > > > > the index file initial size. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Jiangjie (Becket) Qin >> > > > > > >> > > > > > On Thu, Feb 25, 2016 at 3:30 PM, Guozhang Wang < >> wangg...@gmail.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > Jiangjie, >> > > > > > > >> > > > > > > I was originally only thinking about the >> > > "time.index.size.max.bytes" >> > > > > > config >> > > > > > > in addition to the "offset.index.size.max.bytes". Since the >> > > latter's >> > > > > > > default size is 10MB, and for memory mapped file, we will >> > allocate >> > > > that >> > > > > > > much of memory at the start which could be a pressure on RAM >> if >> > we >> > > > > double >> > > > > > > it. >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > On Wed, Feb 24, 2016 at 4:56 PM, Becket Qin < >> > becket....@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hi Guozhang, >> > > > > > > > >> > > > > > > > I thought about this again and it seems we stilll need the >> > > > > > > > time.index.interval.ms configuration to avoid unnecessary >> > > frequent >> > > > > > time >> > > > > > > > index insertion. >> > > > > > > > >> > > > > > > > I just updated the wiki to add index.interval.bytes as an >> > > > additional >> > > > > > > > constraints for time index entry insertion. Another slight >> > change >> > > > > made >> > > > > > > was >> > > > > > > > that as long as a message timestamp shows >> > time.index.interval.ms >> > > > has >> > > > > > > > passed >> > > > > > > > since the timestamp of last time index entry, we will insert >> > > > another >> > > > > > > > timestmap index entry. Previously we always insert time >> index >> > at >> > > > > > > > time.index.interval.ms bucket boundaries. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > >> > > > > > > > Jiangjie (Becket) Qin >> > > > > > > > >> > > > > > > > On Wed, Feb 24, 2016 at 2:40 PM, Becket Qin < >> > > becket....@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Thanks for the comment Guozhang, >> > > > > > > > > >> > > > > > > > > I just changed the configuration name to " >> > > time.index.interval.ms >> > > > ". >> > > > > > > > > >> > > > > > > > > It seems the real question here is how big the offset >> indices >> > > > will >> > > > > > be. >> > > > > > > > > Theoretically we can have one time index entry for each >> > message >> > > > in >> > > > > a >> > > > > > > log >> > > > > > > > > segment. For example, if there is one event per minute >> > > appended, >> > > > we >> > > > > > > might >> > > > > > > > > have to have a time index entry for each message until the >> > > > segment >> > > > > > size >> > > > > > > > is >> > > > > > > > > reached. In that case, the number of index entries in the >> > time >> > > > > index >> > > > > > > > would >> > > > > > > > > be (segment size / avg message size). So the time index >> file >> > > size >> > > > > can >> > > > > > > > > potentially be big. >> > > > > > > > > >> > > > > > > > > I am wondering if we can simply reuse the >> > > "index.interval.bytes" >> > > > > > > > > configuration instead of having a separate time index >> > interval >> > > > ms. >> > > > > > i.e. >> > > > > > > > > instead of inserting a new entry based on time interval, >> we >> > > still >> > > > > > > insert >> > > > > > > > it >> > > > > > > > > based on bytes interval. This does not affect the >> granularity >> > > > > because >> > > > > > > we >> > > > > > > > > can search from the nearest index entry to find the >> message >> > > with >> > > > > > > correct >> > > > > > > > > timestamp. The good thing is that this guarantees there >> will >> > > not >> > > > be >> > > > > > > huge >> > > > > > > > > time indices. We also save the new configuration. >> > > > > > > > > >> > > > > > > > > What do you think? >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Jiangjie (Becket) Qin >> > > > > > > > > >> > > > > > > > > On Wed, Feb 24, 2016 at 1:00 PM, Guozhang Wang < >> > > > wangg...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > >> Thanks Jiangjie, a few comments on the wiki: >> > > > > > > > >> >> > > > > > > > >> 1. Config name "time.index.interval" to " >> > > time.index.interval.ms >> > > > " >> > > > > to >> > > > > > > be >> > > > > > > > >> consistent. Also do we need a >> "time.index.size.max.bytes" as >> > > > well? >> > > > > > > > >> >> > > > > > > > >> 2. Will the memory mapped index file for timestamp have >> the >> > > same >> > > > > > > default >> > > > > > > > >> initial / max size (10485760) as the offset index? >> > > > > > > > >> >> > > > > > > > >> Otherwise LGTM. >> > > > > > > > >> >> > > > > > > > >> Guozhang >> > > > > > > > >> >> > > > > > > > >> On Tue, Feb 23, 2016 at 5:05 PM, Becket Qin < >> > > > becket....@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > >> >> > > > > > > > >> > Bump. >> > > > > > > > >> > >> > > > > > > > >> > Per Jun's comments during KIP hangout, I have updated >> wiki >> > > > with >> > > > > > the >> > > > > > > > >> upgrade >> > > > > > > > >> > plan or KIP-33. >> > > > > > > > >> > >> > > > > > > > >> > Let's vote! >> > > > > > > > >> > >> > > > > > > > >> > Thanks, >> > > > > > > > >> > >> > > > > > > > >> > Jiangjie (Becket) Qin >> > > > > > > > >> > >> > > > > > > > >> > On Wed, Feb 3, 2016 at 10:32 AM, Becket Qin < >> > > > > becket....@gmail.com >> > > > > > > >> > > > > > > > >> wrote: >> > > > > > > > >> > >> > > > > > > > >> > > Hi all, >> > > > > > > > >> > > >> > > > > > > > >> > > I would like to initiate the vote for KIP-33. >> > > > > > > > >> > > >> > > > > > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33 >> > > > > > > > >> > > +-+Add+a+time+based+log+index >> > > > > > > > >> > > >> > > > > > > > >> > > A good amount of the KIP has been touched during the >> > > > > discussion >> > > > > > on >> > > > > > > > >> > KIP-32. >> > > > > > > > >> > > So I also put the link to KIP-32 here for reference. >> > > > > > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP >> > > > > > > > >> > > -32+-+Add+timestamps+to+Kafka+message >> > > > > > > > >> > > >> > > > > > > > >> > > Thanks, >> > > > > > > > >> > > >> > > > > > > > >> > > Jiangjie (Becket) Qin >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> -- >> > > > > > > > >> -- Guozhang >> > > > > > > > >> >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >