Re: KIP-33 Opt out from Time Based indexing
Hi Jun, thanks a lot for the hint, Ill check it out when I get a free minute! Best Jan On 07.09.2016 00:35, Jun Rao wrote: Jan, For the time rolling issue, Jiangjie has committed a fix ( https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can help test out trunk and see if there are any other issues related to time-based index? Thanks, Jun On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak wrote: Hi Jun, sorry for the late reply. Regarding B, my main concern was just complexity of understanding what's going on. As you can see it took me probably some 2 days or so, to fully grab all the details in the implementation and what the impacts are. Usually I prefer to turn things I don't use off, so I don't have to bother. Log Append time will work for me. Rolling logs was my main concern. The producer can specify the timestamp and we use epoch inside the message, I'd bet money, people in the company would have put this epoch also in the produce record. => rollings logs as the broker thinks its millis. So that would probably have caused us at least one outage if a big producer had upgraded and done this, IMO likely mistake. Id just hoped for a more obvious kill-switch, so I didn’t need to bother that much. Best Jan On 29.08.2016 19:36, Jun Rao wrote: Jan, For the usefulness of time index, it's ok if you don't plan to use it. However, I do think there are other people who will want to use it. Fixing an application bug always requires some additional work. Intuitively, being able to seek back to a particular point of time for replay is going to be much more efficient than always replaying from the very beginning, especially when the log is retained for a long period of time. Sure, if you want to have more confidence, you want to rewind a bit conservatively. But being able to rewind an extra hour makes a big difference from having to rewind all to way to 7 days or however long the retention time is. For the OffsetRequest, I actually agree with you that it's useful. People can use that to find the first and the last offset and the offset based on a specific point in time. The part that's a bit awkward with OffsetRequest is that it's based on the last modified time of the log segment, which makes it imprecise (precision is at the segment level, not message level) and non-deterministic (last modified time may change). Another awkwardness is that it supports returning a list of offsets after a specified timestamp. We did that simply because timestamp was only at the segment level then. So, our plan is to replace OffsetRequest with a new one. It will give you the same functionality: find the first and the last offset and the offset based on a specific point in time. It will just be better since it's more precise and more deterministic. For your use case, it seems that you don't care about message creation time. Then, it's possible for you to configure the broker with the log append time. Whether this should be default at the Kafka level is debatable, but it won't prevent your use case. For your suggesting on refactoring, I still want to understand how necessary it is. Your main concerns so far seem to be. (a) Impact on rolling log segments. (b) Time-based index is not useful for me. Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have given my view on this above. Are there any other things that you think that having a time-based index will hurt? Thanks, Jun On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak wrote: Hi Jun, thanks for taking the time to answer on such a detailed level. You are right Log.fetchOffsetByTimestamp works, the comment is just confusing "// Get all the segments whose largest timestamp is smaller than target timestamp" wich is apparently is not what takeWhile does (I am more on the Java end of things, so I relied on the comment). Regarding the frequent file rolling i didn't think of Logcompaction but that indeed is a place where can hit the fan pretty easy. especially if you don't have many updates in there and you pass the timestamp along in a kafka-streams application. Bootstrapping a new application then indeed could produce quite a few old messages kicking this logrolling of until a recent message appears. I guess that makes it a practical issue again even with the 7 days. Thanks for pointing out! Id like to see the appendTime as default, I am very happy that I have it in the backpocket for purpose of tighter sleep and not to worry to much about someone accidentally doing something dodgy on a weekend with our clusters Regarding the usefulness, you will not be able to sell it for me. I don't know how people build applications with this ¯\_(ツ)_/¯ but I don't want to see them. Look at the error recovery with timestamp seek: For fixing a bug, a user needs to stop the SP, truncate all his downstream data perfectly based on their time window.Then restart and do the first fetch based again on the perfect window timeout. From then
Re: KIP-33 Opt out from Time Based indexing
Jan, For the time rolling issue, Jiangjie has committed a fix ( https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can help test out trunk and see if there are any other issues related to time-based index? Thanks, Jun On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak wrote: > Hi Jun, > > sorry for the late reply. Regarding B, my main concern was just complexity > of understanding what's going on. > As you can see it took me probably some 2 days or so, to fully grab all > the details in the implementation and what > the impacts are. Usually I prefer to turn things I don't use off, so I > don't have to bother. Log Append time will work for me. > > Rolling logs was my main concern. The producer can specify the timestamp > and we use epoch inside the message, I'd bet money, > people in the company would have put this epoch also in the produce > record. => rollings logs as the broker thinks its millis. > So that would probably have caused us at least one outage if a big > producer had upgraded and done this, IMO likely mistake. > > Id just hoped for a more obvious kill-switch, so I didn’t need to bother > that much. > > Best Jan > > > > > > On 29.08.2016 19:36, Jun Rao wrote: > >> Jan, >> >> For the usefulness of time index, it's ok if you don't plan to use it. >> However, I do think there are other people who will want to use it. Fixing >> an application bug always requires some additional work. Intuitively, >> being >> able to seek back to a particular point of time for replay is going to be >> much more efficient than always replaying from the very beginning, >> especially when the log is retained for a long period of time. Sure, if >> you >> want to have more confidence, you want to rewind a bit conservatively. But >> being able to rewind an extra hour makes a big difference from having to >> rewind all to way to 7 days or however long the retention time is. >> >> For the OffsetRequest, I actually agree with you that it's useful. People >> can use that to find the first and the last offset and the offset based on >> a specific point in time. The part that's a bit awkward with OffsetRequest >> is that it's based on the last modified time of the log segment, which >> makes it imprecise (precision is at the segment level, not message level) >> and non-deterministic (last modified time may change). Another awkwardness >> is that it supports returning a list of offsets after a specified >> timestamp. We did that simply because timestamp was only at the segment >> level then. So, our plan is to replace OffsetRequest with a new one. It >> will give you the same functionality: find the first and the last offset >> and the offset based on a specific point in time. It will just be better >> since it's more precise and more deterministic. For your use case, it >> seems >> that you don't care about message creation time. Then, it's possible for >> you to configure the broker with the log append time. Whether this should >> be default at the Kafka level is debatable, but it won't prevent your use >> case. >> >> For your suggesting on refactoring, I still want to understand how >> necessary it is. Your main concerns so far seem to be. >> (a) Impact on rolling log segments. >> (b) Time-based index is not useful for me. >> >> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I >> have >> given my view on this above. Are there any other things that you think >> that >> having a time-based index will hurt? >> >> Thanks, >> >> Jun >> >> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak >> wrote: >> >> Hi Jun, >>> >>> thanks for taking the time to answer on such a detailed level. You are >>> right Log.fetchOffsetByTimestamp works, the comment is just confusing >>> "// Get all the segments whose largest timestamp is smaller than target >>> timestamp" wich is apparently is not what takeWhile does (I am more on >>> the Java end of things, so I relied on the comment). >>> >>> Regarding the frequent file rolling i didn't think of Logcompaction but >>> that indeed is a place where can hit the fan pretty easy. especially >>> if you don't have many updates in there and you pass the timestamp along >>> in >>> a kafka-streams application. Bootstrapping a new application then indeed >>> could produce quite a few old messages kicking this logrolling of until a >>> recent message appears. I guess that makes it a practical issue again >>> even >>> with the 7 days. Thanks for pointing out! Id like to see the appendTime >>> as >>> default, I am very happy that I have it in the backpocket for purpose of >>> tighter sleep and not to worry to much about someone accidentally doing >>> something dodgy on a weekend with our clusters >>> >>> Regarding the usefulness, you will not be able to sell it for me. I don't >>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want >>> to >>> see them. >>> Look at the error recovery with timestamp seek: >>> For fixing a bug, a user needs to stop the
Re: KIP-33 Opt out from Time Based indexing
Hi Jun, sorry for the late reply. Regarding B, my main concern was just complexity of understanding what's going on. As you can see it took me probably some 2 days or so, to fully grab all the details in the implementation and what the impacts are. Usually I prefer to turn things I don't use off, so I don't have to bother. Log Append time will work for me. Rolling logs was my main concern. The producer can specify the timestamp and we use epoch inside the message, I'd bet money, people in the company would have put this epoch also in the produce record. => rollings logs as the broker thinks its millis. So that would probably have caused us at least one outage if a big producer had upgraded and done this, IMO likely mistake. Id just hoped for a more obvious kill-switch, so I didn’t need to bother that much. Best Jan On 29.08.2016 19:36, Jun Rao wrote: Jan, For the usefulness of time index, it's ok if you don't plan to use it. However, I do think there are other people who will want to use it. Fixing an application bug always requires some additional work. Intuitively, being able to seek back to a particular point of time for replay is going to be much more efficient than always replaying from the very beginning, especially when the log is retained for a long period of time. Sure, if you want to have more confidence, you want to rewind a bit conservatively. But being able to rewind an extra hour makes a big difference from having to rewind all to way to 7 days or however long the retention time is. For the OffsetRequest, I actually agree with you that it's useful. People can use that to find the first and the last offset and the offset based on a specific point in time. The part that's a bit awkward with OffsetRequest is that it's based on the last modified time of the log segment, which makes it imprecise (precision is at the segment level, not message level) and non-deterministic (last modified time may change). Another awkwardness is that it supports returning a list of offsets after a specified timestamp. We did that simply because timestamp was only at the segment level then. So, our plan is to replace OffsetRequest with a new one. It will give you the same functionality: find the first and the last offset and the offset based on a specific point in time. It will just be better since it's more precise and more deterministic. For your use case, it seems that you don't care about message creation time. Then, it's possible for you to configure the broker with the log append time. Whether this should be default at the Kafka level is debatable, but it won't prevent your use case. For your suggesting on refactoring, I still want to understand how necessary it is. Your main concerns so far seem to be. (a) Impact on rolling log segments. (b) Time-based index is not useful for me. Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have given my view on this above. Are there any other things that you think that having a time-based index will hurt? Thanks, Jun On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak wrote: Hi Jun, thanks for taking the time to answer on such a detailed level. You are right Log.fetchOffsetByTimestamp works, the comment is just confusing "// Get all the segments whose largest timestamp is smaller than target timestamp" wich is apparently is not what takeWhile does (I am more on the Java end of things, so I relied on the comment). Regarding the frequent file rolling i didn't think of Logcompaction but that indeed is a place where can hit the fan pretty easy. especially if you don't have many updates in there and you pass the timestamp along in a kafka-streams application. Bootstrapping a new application then indeed could produce quite a few old messages kicking this logrolling of until a recent message appears. I guess that makes it a practical issue again even with the 7 days. Thanks for pointing out! Id like to see the appendTime as default, I am very happy that I have it in the backpocket for purpose of tighter sleep and not to worry to much about someone accidentally doing something dodgy on a weekend with our clusters Regarding the usefulness, you will not be able to sell it for me. I don't know how people build applications with this ¯\_(ツ)_/¯ but I don't want to see them. Look at the error recovery with timestamp seek: For fixing a bug, a user needs to stop the SP, truncate all his downstream data perfectly based on their time window.Then restart and do the first fetch based again on the perfect window timeout. From then on, he still has NO clue whatsoever if messages that come later now with an earlier timestamp need to go into the previous window or not. (Note that there is >>>absolutly no<<< way to determine this in aggregated downstream windowed stores). So the user is in even though he can seek, he can't rule out his error. IMO it helps them to build the wrong thing, that will just be operational pain *somewhere* Look
Re: KIP-33 Opt out from Time Based indexing
Hi Jun, I just created KAFKA-4099 and will submit patch soon. Thanks, Jiangjie (Becket) Qin On Mon, Aug 29, 2016 at 11:55 AM, Jun Rao wrote: > Jiangjie, > > Good point on the time index format related to uncompressed messages. It > does seem that indexing based on file position requires a bit more > complexity. Since the time index is going to be used infrequently, having a > level of indirection doesn't seem a big concern. So, we can leave the logic > as it is. > > Do you plan to submit a patch to fix the time-based rolling issue? > > Thanks, > > Jun > > On Fri, Aug 26, 2016 at 3:23 PM, Becket Qin wrote: > > > Jun, > > > > Good point about new log rolling behavior issue when move replicas. > Keeping > > the old behavior sounds reasonable to me. > > > > Currently the time index entry points to the exact shallow message with > the > > indexed timestamp, are you suggesting we change it to point to the > starting > > offset of the appended batch(message set)? That doesn't seem to work for > > truncation. For example, imagine an uncompressed message set [(m1, > > offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105, > > position=1100)], if we build the time index based on the starting offset > of > > this message set, the index entry would be (105, 1000), later on when log > > is truncated, and m2 is truncated but m1 is not (this is possible for a > > uncompressed message set) in this case, we will not delete the time index > > entry because it is technically pointing to m1. Pointing to the end of > the > > batch will not work either because then search by timestamp would miss > m2. > > > > I am not sure if it is worth doing, but if we are willing to change the > > semantic to let the time index entry not point to the exact shallow > > message. I am thinking maybe we should just switch the semantic to the > very > > original one, i.e. time index only means "Max timestamp up util this > > offset", which is also aligned with offset index entry. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao wrote: > > > > > Jiangjie, > > > > > > I am not sure about changing the default to LogAppendTime since > > CreateTime > > > is probably what most people want. It also doesn't solve the problem > > > completely. For example, if you do partition reassignment and need to > > copy > > > a bunch of old log segments to a new broker, this may cause log rolling > > on > > > every message. > > > > > > Another alternative is to just keep the old time rolling behavior, > which > > is > > > rolling based on the create time of the log segment. I had two use > cases > > of > > > time-based rolling in mind. The first one is for users who don't want > to > > > retain a message (say sensitive data) in the log for too long. For > this, > > > one can set a time-based retention. If the log can roll periodically > > based > > > on create time, it will freeze the largest timestamp in the rolled > > segment > > > and cause it to be deleted when the time limit has been reached. > Rolling > > > based on the timestamp of the first message doesn't help much here > since > > > the retention is always based on the largest timestamp. The second one > is > > > for log cleaner to happen quicker. Rolling logs periodically based on > > > create time will also work. So, it seems that if we preserve the old > time > > > rolling behavior, we won't lose much functionality, but will avoid the > > > corner case where the logs could be rolled on every message. What do > you > > > think? > > > > > > About storing file position in the time index, I don't think it needs > to > > > incur overhead during append. At the beginning of append, we are > already > > > getting the end position of the log (for maintaining the offset index). > > We > > > can just keep track of that together with the last seen offset. Storing > > the > > > position has the slight benefit that it avoids another indirection and > > > seems more consistent with the offset index. It's worth thinking > through > > > whether this is better. If we want to change it, it's better to change > it > > > now than later. > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin > > wrote: > > > > > > > Hi Jan, > > > > > > > > It seems your main concern is for the changed behavior of time based > > log > > > > rolling and time based retention. That is actually why we have two > > > > timestamp types. If user set the log.message.timestamp.type to > > > > LogAppendTime, the broker will behave exactly the same as they were, > > > except > > > > the rolling and retention would be more accurate and independent to > the > > > > replica movements. > > > > > > > > The log.message.timestam.max.difference.ms is only useful when users > > are > > > > using CreateTime. It is kind of a protection on the broker because an > > > > insanely large timestamp could ruin the time index due to the way we > > > handle > > >
Re: KIP-33 Opt out from Time Based indexing
Jiangjie, Good point on the time index format related to uncompressed messages. It does seem that indexing based on file position requires a bit more complexity. Since the time index is going to be used infrequently, having a level of indirection doesn't seem a big concern. So, we can leave the logic as it is. Do you plan to submit a patch to fix the time-based rolling issue? Thanks, Jun On Fri, Aug 26, 2016 at 3:23 PM, Becket Qin wrote: > Jun, > > Good point about new log rolling behavior issue when move replicas. Keeping > the old behavior sounds reasonable to me. > > Currently the time index entry points to the exact shallow message with the > indexed timestamp, are you suggesting we change it to point to the starting > offset of the appended batch(message set)? That doesn't seem to work for > truncation. For example, imagine an uncompressed message set [(m1, > offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105, > position=1100)], if we build the time index based on the starting offset of > this message set, the index entry would be (105, 1000), later on when log > is truncated, and m2 is truncated but m1 is not (this is possible for a > uncompressed message set) in this case, we will not delete the time index > entry because it is technically pointing to m1. Pointing to the end of the > batch will not work either because then search by timestamp would miss m2. > > I am not sure if it is worth doing, but if we are willing to change the > semantic to let the time index entry not point to the exact shallow > message. I am thinking maybe we should just switch the semantic to the very > original one, i.e. time index only means "Max timestamp up util this > offset", which is also aligned with offset index entry. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao wrote: > > > Jiangjie, > > > > I am not sure about changing the default to LogAppendTime since > CreateTime > > is probably what most people want. It also doesn't solve the problem > > completely. For example, if you do partition reassignment and need to > copy > > a bunch of old log segments to a new broker, this may cause log rolling > on > > every message. > > > > Another alternative is to just keep the old time rolling behavior, which > is > > rolling based on the create time of the log segment. I had two use cases > of > > time-based rolling in mind. The first one is for users who don't want to > > retain a message (say sensitive data) in the log for too long. For this, > > one can set a time-based retention. If the log can roll periodically > based > > on create time, it will freeze the largest timestamp in the rolled > segment > > and cause it to be deleted when the time limit has been reached. Rolling > > based on the timestamp of the first message doesn't help much here since > > the retention is always based on the largest timestamp. The second one is > > for log cleaner to happen quicker. Rolling logs periodically based on > > create time will also work. So, it seems that if we preserve the old time > > rolling behavior, we won't lose much functionality, but will avoid the > > corner case where the logs could be rolled on every message. What do you > > think? > > > > About storing file position in the time index, I don't think it needs to > > incur overhead during append. At the beginning of append, we are already > > getting the end position of the log (for maintaining the offset index). > We > > can just keep track of that together with the last seen offset. Storing > the > > position has the slight benefit that it avoids another indirection and > > seems more consistent with the offset index. It's worth thinking through > > whether this is better. If we want to change it, it's better to change it > > now than later. > > > > Thanks, > > > > Jun > > > > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin > wrote: > > > > > Hi Jan, > > > > > > It seems your main concern is for the changed behavior of time based > log > > > rolling and time based retention. That is actually why we have two > > > timestamp types. If user set the log.message.timestamp.type to > > > LogAppendTime, the broker will behave exactly the same as they were, > > except > > > the rolling and retention would be more accurate and independent to the > > > replica movements. > > > > > > The log.message.timestam.max.difference.ms is only useful when users > are > > > using CreateTime. It is kind of a protection on the broker because an > > > insanely large timestamp could ruin the time index due to the way we > > handle > > > out-of-order timestamps when using CreateTime. But the users who are > > using > > > LogAppendTime do not need to worry about this at all. > > > > > > The first odd thing is a valid concern. In your case, because you have > > the > > > timestamp in the message value, it is probably fine to just use > > > LogAppendTime on the broker, so the timestamp will only be used to > > provide > > > accurate
Re: KIP-33 Opt out from Time Based indexing
Jan, For the usefulness of time index, it's ok if you don't plan to use it. However, I do think there are other people who will want to use it. Fixing an application bug always requires some additional work. Intuitively, being able to seek back to a particular point of time for replay is going to be much more efficient than always replaying from the very beginning, especially when the log is retained for a long period of time. Sure, if you want to have more confidence, you want to rewind a bit conservatively. But being able to rewind an extra hour makes a big difference from having to rewind all to way to 7 days or however long the retention time is. For the OffsetRequest, I actually agree with you that it's useful. People can use that to find the first and the last offset and the offset based on a specific point in time. The part that's a bit awkward with OffsetRequest is that it's based on the last modified time of the log segment, which makes it imprecise (precision is at the segment level, not message level) and non-deterministic (last modified time may change). Another awkwardness is that it supports returning a list of offsets after a specified timestamp. We did that simply because timestamp was only at the segment level then. So, our plan is to replace OffsetRequest with a new one. It will give you the same functionality: find the first and the last offset and the offset based on a specific point in time. It will just be better since it's more precise and more deterministic. For your use case, it seems that you don't care about message creation time. Then, it's possible for you to configure the broker with the log append time. Whether this should be default at the Kafka level is debatable, but it won't prevent your use case. For your suggesting on refactoring, I still want to understand how necessary it is. Your main concerns so far seem to be. (a) Impact on rolling log segments. (b) Time-based index is not useful for me. Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have given my view on this above. Are there any other things that you think that having a time-based index will hurt? Thanks, Jun On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak wrote: > Hi Jun, > > thanks for taking the time to answer on such a detailed level. You are > right Log.fetchOffsetByTimestamp works, the comment is just confusing > "// Get all the segments whose largest timestamp is smaller than target > timestamp" wich is apparently is not what takeWhile does (I am more on > the Java end of things, so I relied on the comment). > > Regarding the frequent file rolling i didn't think of Logcompaction but > that indeed is a place where can hit the fan pretty easy. especially > if you don't have many updates in there and you pass the timestamp along in > a kafka-streams application. Bootstrapping a new application then indeed > could produce quite a few old messages kicking this logrolling of until a > recent message appears. I guess that makes it a practical issue again even > with the 7 days. Thanks for pointing out! Id like to see the appendTime as > default, I am very happy that I have it in the backpocket for purpose of > tighter sleep and not to worry to much about someone accidentally doing > something dodgy on a weekend with our clusters > > Regarding the usefulness, you will not be able to sell it for me. I don't > know how people build applications with this ¯\_(ツ)_/¯ but I don't want to > see them. > Look at the error recovery with timestamp seek: > For fixing a bug, a user needs to stop the SP, truncate all his downstream > data perfectly based on their time window.Then restart and do the first > fetch based > again on the perfect window timeout. From then on, he still has NO clue > whatsoever if messages that come later now with an earlier timestamp need > to go into the > previous window or not. (Note that there is >>>absolutly no<<< way to > determine this in aggregated downstream windowed stores). So the user is in > even though he can seek, he > can't rule out his error. IMO it helps them to build the wrong thing, that > will just be operational pain *somewhere* > > Look at the error recovery without timestamp seek: > start your application from beginning with a different output > (version,key,partition) wait for it to fully catch up. drop the timewindows > the error happend + confidence interval (if your data isnt there anymore, > no seek will help) in from the old version. Stop the stream processor, > merge the data it created, switch back to the original > (version,key,partition) and start the SP again. > Done. As bigger you choose the confidence interval, the more correct, the > less the index helps. usually you want maximum confidence => no index > usage, get everything that is still there. (Maybe even redump from hadoop > in extreme cases) ironically causing the log to roll all the time (as you > probably publish to a new topic and have the streams application use b
Re: KIP-33 Opt out from Time Based indexing
Jan, Thanks for the example of reprocessing the messages. I think in any case, reconsuming all the messages will definitely work. What we want to do here is to see if we can avoid doing that by only reconsuming necessary messages. In the scenario you mentioned, can you store an "offset-of-last-update" for each window? It is essentially the offset of the last message that goes into the window. During the reprocess, if the earlier messages has an offset less than or equals to the "offset-of-last-update" for the corresponding window, the processor knows that this message has been added to the aggregated window. This may need some work, but seems cheaper than reconsuming everything. Admittedly, if the messages are in create time and the timestamp is spread over a wide range, users may not be able to gain much from search by timestamp. Regarding OffstRequest. I think what Jun meant was that we are going to deprecate the current OffsetRequest v0 which returns a list of segment base offsets. I am working on a new KIP for OffsetReqeust v1 which returns the accurate message offset based on timestamp. I can hardly imagine we will let users deal with a physical position directly :) Thanks, Jiangjie (Becket) Qin On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak wrote: > Hi Jun, > > thanks for taking the time to answer on such a detailed level. You are > right Log.fetchOffsetByTimestamp works, the comment is just confusing > "// Get all the segments whose largest timestamp is smaller than target > timestamp" wich is apparently is not what takeWhile does (I am more on the > Java end of things, so I relied on the comment). > > Regarding the frequent file rolling i didn't think of Logcompaction but > that indeed is a place where can hit the fan pretty easy. especially > if you don't have many updates in there and you pass the timestamp along in > a kafka-streams application. Bootstrapping a new application then indeed > could produce quite a few old messages kicking this logrolling of until a > recent message appears. I guess that makes it a practical issue again even > with the 7 days. Thanks for pointing out! Id like to see the appendTime as > default, I am very happy that I have it in the backpocket for purpose of > tighter sleep and not to worry to much about someone accidentally doing > something dodgy on a weekend with our clusters > > Regarding the usefulness, you will not be able to sell it for me. I don't > know how people build applications with this ¯\_(ツ)_/¯ but I don't want to > see them. > Look at the error recovery with timestamp seek: > For fixing a bug, a user needs to stop the SP, truncate all his downstream > data perfectly based on their time window.Then restart and do the first > fetch based > again on the perfect window timeout. From then on, he still has NO clue > whatsoever if messages that come later now with an earlier timestamp need > to go into the > previous window or not. (Note that there is >>>absolutly no<<< way to > determine this in aggregated downstream windowed stores). So the user is in > even though he can seek, he > can't rule out his error. IMO it helps them to build the wrong thing, that > will just be operational pain *somewhere* > > Look at the error recovery without timestamp seek: > start your application from beginning with a different output > (version,key,partition) wait for it to fully catch up. drop the timewindows > the error happend + confidence interval (if your data isnt there anymore, > no seek will help) in from the old version. Stop the stream processor, > merge the data it created, switch back to the original > (version,key,partition) and start the SP again. > Done. As bigger you choose the confidence interval, the more correct, the > less the index helps. usually you want maximum confidence => no index > usage, get everything that is still there. (Maybe even redump from hadoop > in extreme cases) ironically causing the log to roll all the time (as you > probably publish to a new topic and have the streams application use both) > :( > > As you can see, even though the users can seek, if they want to create > proper numbers, Billing information eg. They are in trouble, and giving > them this index will just make them implement the wrong solution! It boils > down to: this index is not the kafka way of doing things. The index can > help the second approach but usually one chooses the confidence interval = > as much as one can get. > > Then the last thing. "OffsetRequest is a legacy request. It's awkward to > use and we plan to deprecate it over time". You got to be kidding me. It > was wired to get the byteposition back then, but getting the offsets is > perfectly reasonable and one of the best things in the world. want to know > how your stream looked at a specific point in time? get start and end > offset, fetch whenever you like, you get an perfect snapshot in wall time. > this is usefull for compacted topis aswell as streaming topics. Offsets are
Re: KIP-33 Opt out from Time Based indexing
Hi Jun, thanks for taking the time to answer on such a detailed level. You are right Log.fetchOffsetByTimestamp works, the comment is just confusing "// Get all the segments whose largest timestamp is smaller than target timestamp" wich is apparently is not what takeWhile does (I am more on the Java end of things, so I relied on the comment). Regarding the frequent file rolling i didn't think of Logcompaction but that indeed is a place where can hit the fan pretty easy. especially if you don't have many updates in there and you pass the timestamp along in a kafka-streams application. Bootstrapping a new application then indeed could produce quite a few old messages kicking this logrolling of until a recent message appears. I guess that makes it a practical issue again even with the 7 days. Thanks for pointing out! Id like to see the appendTime as default, I am very happy that I have it in the backpocket for purpose of tighter sleep and not to worry to much about someone accidentally doing something dodgy on a weekend with our clusters Regarding the usefulness, you will not be able to sell it for me. I don't know how people build applications with this ¯\_(ツ)_/¯ but I don't want to see them. Look at the error recovery with timestamp seek: For fixing a bug, a user needs to stop the SP, truncate all his downstream data perfectly based on their time window.Then restart and do the first fetch based again on the perfect window timeout. From then on, he still has NO clue whatsoever if messages that come later now with an earlier timestamp need to go into the previous window or not. (Note that there is >>>absolutly no<<< way to determine this in aggregated downstream windowed stores). So the user is in even though he can seek, he can't rule out his error. IMO it helps them to build the wrong thing, that will just be operational pain *somewhere* Look at the error recovery without timestamp seek: start your application from beginning with a different output (version,key,partition) wait for it to fully catch up. drop the timewindows the error happend + confidence interval (if your data isnt there anymore, no seek will help) in from the old version. Stop the stream processor, merge the data it created, switch back to the original (version,key,partition) and start the SP again. Done. As bigger you choose the confidence interval, the more correct, the less the index helps. usually you want maximum confidence => no index usage, get everything that is still there. (Maybe even redump from hadoop in extreme cases) ironically causing the log to roll all the time (as you probably publish to a new topic and have the streams application use both) :( As you can see, even though the users can seek, if they want to create proper numbers, Billing information eg. They are in trouble, and giving them this index will just make them implement the wrong solution! It boils down to: this index is not the kafka way of doing things. The index can help the second approach but usually one chooses the confidence interval = as much as one can get. Then the last thing. "OffsetRequest is a legacy request. It's awkward to use and we plan to deprecate it over time". You got to be kidding me. It was wired to get the byteposition back then, but getting the offsets is perfectly reasonable and one of the best things in the world. want to know how your stream looked at a specific point in time? get start and end offset, fetch whenever you like, you get an perfect snapshot in wall time. this is usefull for compacted topis aswell as streaming topics. Offsets are a well known thing in kafka and in no way awkward as its monotonically increasing property is just great. For seeking the log based on a confidence interval (the only chance you get in non-key logs reprocessing) one can also bisect the log from the client. As the case is rare it is intensive and causes at least a few hundreds seeks for bigger topics. but I guess the broker does these extra for the new index file now. This index, I feel is just not following the whole "kafka-way". Can you suggest on the proposed re-factoring? what are the chance to get it upstream if I could pull it off? (unlikely) Thanks for all the effort you put in into listening to my concerns. highly appreciated! Best Jan On 25.08.2016 23:36, Jun Rao wrote: Jan, Thanks a lot for the feedback. Now I understood your concern better. The following are my comments. The first odd thing that you pointed out could be a real concern. Basically, if a producer publishes messages with really old timestamp, our default log.roll.hours (7 days) will indeed cause the broker to roll a log on ever message, which would be bad. Time-based rolling is actually used infrequently. The only use case that I am aware of is that for compacted topics, rolling logs based on time could allow the compaction to happen sooner (since the active segment is never cl
Re: KIP-33 Opt out from Time Based indexing
Jun, Good point about new log rolling behavior issue when move replicas. Keeping the old behavior sounds reasonable to me. Currently the time index entry points to the exact shallow message with the indexed timestamp, are you suggesting we change it to point to the starting offset of the appended batch(message set)? That doesn't seem to work for truncation. For example, imagine an uncompressed message set [(m1, offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105, position=1100)], if we build the time index based on the starting offset of this message set, the index entry would be (105, 1000), later on when log is truncated, and m2 is truncated but m1 is not (this is possible for a uncompressed message set) in this case, we will not delete the time index entry because it is technically pointing to m1. Pointing to the end of the batch will not work either because then search by timestamp would miss m2. I am not sure if it is worth doing, but if we are willing to change the semantic to let the time index entry not point to the exact shallow message. I am thinking maybe we should just switch the semantic to the very original one, i.e. time index only means "Max timestamp up util this offset", which is also aligned with offset index entry. Thanks, Jiangjie (Becket) Qin On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao wrote: > Jiangjie, > > I am not sure about changing the default to LogAppendTime since CreateTime > is probably what most people want. It also doesn't solve the problem > completely. For example, if you do partition reassignment and need to copy > a bunch of old log segments to a new broker, this may cause log rolling on > every message. > > Another alternative is to just keep the old time rolling behavior, which is > rolling based on the create time of the log segment. I had two use cases of > time-based rolling in mind. The first one is for users who don't want to > retain a message (say sensitive data) in the log for too long. For this, > one can set a time-based retention. If the log can roll periodically based > on create time, it will freeze the largest timestamp in the rolled segment > and cause it to be deleted when the time limit has been reached. Rolling > based on the timestamp of the first message doesn't help much here since > the retention is always based on the largest timestamp. The second one is > for log cleaner to happen quicker. Rolling logs periodically based on > create time will also work. So, it seems that if we preserve the old time > rolling behavior, we won't lose much functionality, but will avoid the > corner case where the logs could be rolled on every message. What do you > think? > > About storing file position in the time index, I don't think it needs to > incur overhead during append. At the beginning of append, we are already > getting the end position of the log (for maintaining the offset index). We > can just keep track of that together with the last seen offset. Storing the > position has the slight benefit that it avoids another indirection and > seems more consistent with the offset index. It's worth thinking through > whether this is better. If we want to change it, it's better to change it > now than later. > > Thanks, > > Jun > > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin wrote: > > > Hi Jan, > > > > It seems your main concern is for the changed behavior of time based log > > rolling and time based retention. That is actually why we have two > > timestamp types. If user set the log.message.timestamp.type to > > LogAppendTime, the broker will behave exactly the same as they were, > except > > the rolling and retention would be more accurate and independent to the > > replica movements. > > > > The log.message.timestam.max.difference.ms is only useful when users are > > using CreateTime. It is kind of a protection on the broker because an > > insanely large timestamp could ruin the time index due to the way we > handle > > out-of-order timestamps when using CreateTime. But the users who are > using > > LogAppendTime do not need to worry about this at all. > > > > The first odd thing is a valid concern. In your case, because you have > the > > timestamp in the message value, it is probably fine to just use > > LogAppendTime on the broker, so the timestamp will only be used to > provide > > accurate log retention and log rolling based on when the message was > > produced to the broker regardless when the message was created. This > should > > provide the exact same behavior on the broker side as before. (Apologies > > for the stale WIKI statement on the lines you quoted, as Jun said, the > log > > segment rolling is based on the timestamp of the first message instead of > > the largest timestamp in the log segment. I sent a change notification to > > the mailing list but forgot to update the wiki page. I just updated the > > wiki page.) > > > > The second odd thing, as Jun mentioned, by design we do not keep a global > > timestamp orde
Re: KIP-33 Opt out from Time Based indexing
Jiangjie, I am not sure about changing the default to LogAppendTime since CreateTime is probably what most people want. It also doesn't solve the problem completely. For example, if you do partition reassignment and need to copy a bunch of old log segments to a new broker, this may cause log rolling on every message. Another alternative is to just keep the old time rolling behavior, which is rolling based on the create time of the log segment. I had two use cases of time-based rolling in mind. The first one is for users who don't want to retain a message (say sensitive data) in the log for too long. For this, one can set a time-based retention. If the log can roll periodically based on create time, it will freeze the largest timestamp in the rolled segment and cause it to be deleted when the time limit has been reached. Rolling based on the timestamp of the first message doesn't help much here since the retention is always based on the largest timestamp. The second one is for log cleaner to happen quicker. Rolling logs periodically based on create time will also work. So, it seems that if we preserve the old time rolling behavior, we won't lose much functionality, but will avoid the corner case where the logs could be rolled on every message. What do you think? About storing file position in the time index, I don't think it needs to incur overhead during append. At the beginning of append, we are already getting the end position of the log (for maintaining the offset index). We can just keep track of that together with the last seen offset. Storing the position has the slight benefit that it avoids another indirection and seems more consistent with the offset index. It's worth thinking through whether this is better. If we want to change it, it's better to change it now than later. Thanks, Jun On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin wrote: > Hi Jan, > > It seems your main concern is for the changed behavior of time based log > rolling and time based retention. That is actually why we have two > timestamp types. If user set the log.message.timestamp.type to > LogAppendTime, the broker will behave exactly the same as they were, except > the rolling and retention would be more accurate and independent to the > replica movements. > > The log.message.timestam.max.difference.ms is only useful when users are > using CreateTime. It is kind of a protection on the broker because an > insanely large timestamp could ruin the time index due to the way we handle > out-of-order timestamps when using CreateTime. But the users who are using > LogAppendTime do not need to worry about this at all. > > The first odd thing is a valid concern. In your case, because you have the > timestamp in the message value, it is probably fine to just use > LogAppendTime on the broker, so the timestamp will only be used to provide > accurate log retention and log rolling based on when the message was > produced to the broker regardless when the message was created. This should > provide the exact same behavior on the broker side as before. (Apologies > for the stale WIKI statement on the lines you quoted, as Jun said, the log > segment rolling is based on the timestamp of the first message instead of > the largest timestamp in the log segment. I sent a change notification to > the mailing list but forgot to update the wiki page. I just updated the > wiki page.) > > The second odd thing, as Jun mentioned, by design we do not keep a global > timestamp order. During the search, we start from the oldest segment and > scan over the segment until we find the first segment that contains a > timestamp which is larger than the target timestamp. This should guarantee > no message with larger timestamp will be missed. For example if we have 3 > segments whose largest timestamps are 100 300 200, and we are looking for > timestamp 250, we will start to scan at first segment and stop at the > second segment and search inside that segment for the first timestmap > greater or equals to 250. So reordered largest timestamp across segments > should not be an issue. > > The third odd thing is a good point. There are a few reasons we chose to > store the offsets instead physical position in the time index. Easier > truncation is one of the reasons but this may not be a big issue. Another > reason is that in the early implementation, the time index and offset index > are actually aligned, i.e. each offset in the time index as a corresponding > entry in the offset index ( the reverse is not true). So the physical > position is already stored in the offset index. Later on we switched to the > current implementation, which has the time index pointing to the exact > shallow message in the log segment. With this implementation, if the > message with the largest timestamp appears in the middle of an uncompressed > message set, we may need to calculate the physical position for that > message. This is doable but could potentially be an overhead for each > a
Re: KIP-33 Opt out from Time Based indexing
Hi Jan, It seems your main concern is for the changed behavior of time based log rolling and time based retention. That is actually why we have two timestamp types. If user set the log.message.timestamp.type to LogAppendTime, the broker will behave exactly the same as they were, except the rolling and retention would be more accurate and independent to the replica movements. The log.message.timestam.max.difference.ms is only useful when users are using CreateTime. It is kind of a protection on the broker because an insanely large timestamp could ruin the time index due to the way we handle out-of-order timestamps when using CreateTime. But the users who are using LogAppendTime do not need to worry about this at all. The first odd thing is a valid concern. In your case, because you have the timestamp in the message value, it is probably fine to just use LogAppendTime on the broker, so the timestamp will only be used to provide accurate log retention and log rolling based on when the message was produced to the broker regardless when the message was created. This should provide the exact same behavior on the broker side as before. (Apologies for the stale WIKI statement on the lines you quoted, as Jun said, the log segment rolling is based on the timestamp of the first message instead of the largest timestamp in the log segment. I sent a change notification to the mailing list but forgot to update the wiki page. I just updated the wiki page.) The second odd thing, as Jun mentioned, by design we do not keep a global timestamp order. During the search, we start from the oldest segment and scan over the segment until we find the first segment that contains a timestamp which is larger than the target timestamp. This should guarantee no message with larger timestamp will be missed. For example if we have 3 segments whose largest timestamps are 100 300 200, and we are looking for timestamp 250, we will start to scan at first segment and stop at the second segment and search inside that segment for the first timestmap greater or equals to 250. So reordered largest timestamp across segments should not be an issue. The third odd thing is a good point. There are a few reasons we chose to store the offsets instead physical position in the time index. Easier truncation is one of the reasons but this may not be a big issue. Another reason is that in the early implementation, the time index and offset index are actually aligned, i.e. each offset in the time index as a corresponding entry in the offset index ( the reverse is not true). So the physical position is already stored in the offset index. Later on we switched to the current implementation, which has the time index pointing to the exact shallow message in the log segment. With this implementation, if the message with the largest timestamp appears in the middle of an uncompressed message set, we may need to calculate the physical position for that message. This is doable but could potentially be an overhead for each append and adding some complexity. Given that OffsetRequest is supposed to be a pretty infrequent request, it is probably OK to do the secondary lookup but save the work on each append. Jun has already mentioned a few use cases for searching by timestamp. At LinkedIn we also have several such use cases where people want to rewind the offsets to a certain time and reprocess the streams. @Jun, currently we are using CreateTime as the default value for log.message.timestamp.type. I am wondering would it be less surprising if we change the default value to LogAppendTime so that the previous behavior is maintained, because for users it would be bad if upgrading cause their message got deleted due the change of the behavior. What do you think? Thanks, Jiangjie (Becket) Qin On Thu, Aug 25, 2016 at 2:36 PM, Jun Rao wrote: > Jan, > > Thanks a lot for the feedback. Now I understood your concern better. The > following are my comments. > > The first odd thing that you pointed out could be a real concern. > Basically, if a producer publishes messages with really old timestamp, our > default log.roll.hours (7 days) will indeed cause the broker to roll a log > on ever message, which would be bad. Time-based rolling is actually used > infrequently. The only use case that I am aware of is that for compacted > topics, rolling logs based on time could allow the compaction to happen > sooner (since the active segment is never cleaned). One option is to change > the default log.roll.hours to infinite and also document the impact on > changing log.roll.hours. Jiangjie, what do you think? > > For the second odd thing, the OffsetRequest is a legacy request. It's > awkward to use and we plan to deprecate it over time. That's why we haven't > change the logic in serving OffsetRequest after KIP-33. The plan is to > introduce a new OffsetRequest that will be exploiting the time based index. > It's possible to have log segments with non-increasing largest timestamp
Re: KIP-33 Opt out from Time Based indexing
Jan, Thanks a lot for the feedback. Now I understood your concern better. The following are my comments. The first odd thing that you pointed out could be a real concern. Basically, if a producer publishes messages with really old timestamp, our default log.roll.hours (7 days) will indeed cause the broker to roll a log on ever message, which would be bad. Time-based rolling is actually used infrequently. The only use case that I am aware of is that for compacted topics, rolling logs based on time could allow the compaction to happen sooner (since the active segment is never cleaned). One option is to change the default log.roll.hours to infinite and also document the impact on changing log.roll.hours. Jiangjie, what do you think? For the second odd thing, the OffsetRequest is a legacy request. It's awkward to use and we plan to deprecate it over time. That's why we haven't change the logic in serving OffsetRequest after KIP-33. The plan is to introduce a new OffsetRequest that will be exploiting the time based index. It's possible to have log segments with non-increasing largest timestamp. As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the segments in offset order and stop when we see the target timestamp. For the third odd thing, one of the original reasons why the time-based index points to an offset instead of the file position is that it makes truncating the time index to an offset easier since the offset is in the index. Looking at the code, we could also store the file position in the time index and do truncation based on position, instead of offset. It probably has a slight advantage of consistency between the two indexes and avoiding another level of indirection when looking up the time index. Jiangjie, have we ever considered that? The idea of log.message.timestamp.difference.max.ms is to prevent the timestamp in the published messages to drift too far away from the current timestamp. The default value is infinite though. Lastly, for the usefulness of time-based index, it's actually a feature that the community wanted and voted for, not just for Confluent customers. For example, being able to seek to an offset based on timestamp has been a frequently asked feature. This can be useful for at least the following scenarios: (1) If there is a bug in a consumer application, the user will want to rewind the consumption after fixing the logic. In this case, it's more convenient to rewind the consumption based on a timestamp. (2) In a multi data center setup, it's common for people to mirror the data from one Kafka cluster in one data center to another cluster in a different data center. If one data center fails, people want to be able to resume the consumption in the other data center. Since the offsets are not preserving between the two clusters through mirroring, being able to find a starting offset based on timestamp will allow the consumer to resume the consumption without missing any messages and also not replaying too many messages. Thanks, Jun On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak wrote: > Hey Jun, > > I go and try again :), wrote the first one in quite a stressful > environment. The bottom line is that I, for our use cases, see a to small > use/effort ratio in this time index. > We do not bootstrap new consumers for key-less logs so frequently and when > we do it, they usually want everything (prod deployment) or just start at > the end ( during development). > That caused quite some frustration. Would be better if I could just have > turned it off and don't bother any more. Anyhow in the meantime I had to > dig deeper into the inner workings > and the impacts are not as dramatic as I initially assumed. But it still > carries along some oddities I want to list here. > > first odd thing: > Quote > --- > Enforce time based log rolling > > Currently time based log rolling is based on the creating time of the log > segment. With this KIP, the time based rolling would be changed to based on > the largest timestamp ever seen in a log segment. A new log segment will be > rolled out if current time is greater than largest timestamp ever seen in > the log segment + log.roll.ms. When message.timestamp.type=CreateTime, > user should set max.message.time.difference.ms appropriately together > with log.roll.ms to avoid frequent log segment roll out. > --- > imagine a Mirrormaker falls behind and the Mirrormaker has a delay of some > time > log.roll.ms. > From my understanding, when noone else is producing to this partition > except the mirror maker, the broker will start rolling on every append? > Just because you maybe under DOS-attack and your application only works in > the remote location. (also a good occasion for MM to fall behind) > But checking the default values indicates that it should indeed not become > a problem as log.roll.ms defaults to ~>7 days. > > > second odd thing: > Quote > --- > A time index entry (*T*, *offset*) means that in this segment any mess
Re: KIP-33 Opt out from Time Based indexing
Hey Jun, I go and try again :), wrote the first one in quite a stressful environment. The bottom line is that I, for our use cases, see a to small use/effort ratio in this time index. We do not bootstrap new consumers for key-less logs so frequently and when we do it, they usually want everything (prod deployment) or just start at the end ( during development). That caused quite some frustration. Would be better if I could just have turned it off and don't bother any more. Anyhow in the meantime I had to dig deeper into the inner workings and the impacts are not as dramatic as I initially assumed. But it still carries along some oddities I want to list here. first odd thing: Quote --- Enforce time based log rolling Currently time based log rolling is based on the creating time of the log segment. With this KIP, the time based rolling would be changed to based on the largest timestamp ever seen in a log segment. A new log segment will be rolled out if current time is greater than largest timestamp ever seen in the log segment + log.roll.ms. When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out. --- imagine a Mirrormaker falls behind and the Mirrormaker has a delay of some time > log.roll.ms. From my understanding, when noone else is producing to this partition except the mirror maker, the broker will start rolling on every append? Just because you maybe under DOS-attack and your application only works in the remote location. (also a good occasion for MM to fall behind) But checking the default values indicates that it should indeed not become a problem as log.roll.ms defaults to ~>7 days. second odd thing: Quote --- A time index entry (/T/, /offset/) means that in this segment any message whose timestamp is greater than /T/ come after /offset./ The OffsetRequest behaves almost the same as before. If timestamp *T* is set in the OffsetRequest, the first offset in the returned offset sequence means that if user want to consume from *T*, that is the offset to start with. The guarantee is that any message whose timestamp is greater than T has a bigger offset. i.e. Any message before this offset has a timestamp < *T*. --- Given how the index is maintained, with a little bit of bad luck (rolling upgrade/config change of mirrormakers for different colocations) one ends with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimestamp. If I do not overlook something here, then the fetch code does not seem to take that into account. https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604 In this case the Goal listed number 1, not loose any messages, is not achieved. easy fix seems to be to sort the segsArray by maxtimestamp but can't wrap my head around it just now. third odd thing: Regarding the worry of increasing complexity. Looking at the code https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 -196 https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 & 230 https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 -266 https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 -307 https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 - 410 https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 - 435 https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 -108 and especially https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717 it feels like the Log & Log segment having a detailed knowledge about the maintained indexes is not the ideal way to model the problem. Having the Server maintian a Set of Indexes could reduce the code complexity, while also allowing an easy switch to turn it off. I think both indexes could point to the physical position, a client would do fetch(timestamp), and the continue with the offsets as usual. Is there any specific reason the timestamp index points into the offset index? For reading one would need to branch earlier, maybe already in ApiHandler and decide what indexes to query, but this branching logic is there now anyhow. Further I also can't think of a situation where one wants to have this log.message.timestamp.difference.max.ms take effect. I think this defeats goal 1 again. ITE having this index in the brokers now feels wired to me. Gives me a fe
Re: Re: KIP-33 Opt out from Time Based indexing
Jan, Thanks for the reply. I actually wasn't sure what your main concern on time-based rolling is. Just a couple of clarifications. (1) Time-based rolling doesn't control how long a segment will be retained for. For retention, if you use time-based, it will now be based on the timestamp in the message. If you use size-based, it works the same as before. Is your concern on time-based retention? If so, you can always configure the timestamp in all topics to be log append time, which will give you the same behavior as before. (2) The creation time of the segment is never exposed to the consumer and therefore is never preserved in MirrorMaker. In contrast, the timestamp in the message will be preserved in MirrorMaker. So, not sure what your concern on MirrorMaker is. Jun On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak wrote: > Hi Jun, > > I copy pasted this mail from the archive, as I somehow didn't receive it > per mail. I will sill make some comments in line, > hopefully you can find them quick enough, my apologies. > > To make things more clear, you should also know, that all messages in our > kafka setup have a common way to access their timestamp already (its > encoded in the value the same way always) > Sometimes this is a logical time (eg same timestamp accross many different > topics / partitions), say PHP request start time or the like. So kafkas > internal timestamps are not really attractive > for us anyways currently. > > I hope I can make a point and not waste your time. > > Best Jan, > > hopefully everything makes sense > > > > Jan, > > Currently, there is no switch to disable the time based index. > > There are quite a few use cases of time based index. > > 1. From KIP-33's wiki, it allows us to do time-based retention accurately. > Before KIP-33, the time-based retention is based on the last modified time > of each log segment. The main issue is that last modified time can change > over time. For example, if a broker loses storage and has to re-replicate > all data, those re-replicated segments will be retained much longer since > their last modified time is more recent. Having a time-based index allows > us to retain segments based on the message time, not the last modified > time. This can also benefit KIP-71, where we want to combine time-based > retention and compaction. > > /If your sparse on discspace, one could try to get by that with > retention.bytes/ > or, as we did, ssh into the box and rm it, which worked quite good when no > one reads it. > Chuckles a little when its read but readers usually do an auto.offset.reset > (they are to slow any ways if they reading the last segments hrhr). > > 2. In KIP-58, we want to delay log compaction based on a configurable > amount of time. Time-based index allows us to do this more accurately. > > /good point, seems reasonable/ > > 3. We plan to add an api in the consumer to allow seeking to an offset > based on a timestamp. The time based index allows us to do this more > accurately and fast. > > /Sure, I personally feel that you rarely want to do this. For Camus, we > used max.pull.historic.days (or simmilliar) successfully quite often. we > just gave it an extra day and got what we wanted > and for debugging my bisect tool works well enough. So these are the 2 > usecases we expierenced already and found a decent way around it./ > > Now for the impact. > > a. There is a slight change on how time-based rolling works. Before KIP-33, > rolling was based on the time when a segment was loaded in the broker. > After KIP-33, rolling is based on the time of the first message of a > segment. Not sure if this is your concern. In the common case, the two > behave more or less the same. The latter is actually more deterministic > since it's not sensitive to broker restarts. > > /This is part of my main concern indeed. This is what scares me and I > preffered to just opt out, instead of reviewing all our pipelines to check > whats gonna happen when we put it live. > For Example the Mirrormakers, If they want to preserve create time from > the source cluster and publish the same create time (wich they should do, > if you don't encode your own timestamps and want > to have proper kafka-streams windowing). Then I am quite concerned when > have problems if our cross ocian links and fall behind, say a day or two. > Then I can think of an very up to date MirrorMaker from > one colocation and a very laggy Mirrormaker from another colocation. For > me its not 100% clear whats gonna happen. But I can't think of sane > defaults there. That i love kafka for. > Just tricky to be convinced that an upgrade is safe, wich was usually easy. > / > b. Time-based index potentially adds overhead to producing messages and > loading segments. Our experiments show that the impact to producing is > insignificant. The time to load segments when restarting a broker can be > doubled. However, the absolute time is still reasonable. For example, > loading 10K log segments wit
Re: Re: KIP-33 Opt out from Time Based indexing
Hi Jun, I copy pasted this mail from the archive, as I somehow didn't receive it per mail. I will sill make some comments in line, hopefully you can find them quick enough, my apologies. To make things more clear, you should also know, that all messages in our kafka setup have a common way to access their timestamp already (its encoded in the value the same way always) Sometimes this is a logical time (eg same timestamp accross many different topics / partitions), say PHP request start time or the like. So kafkas internal timestamps are not really attractive for us anyways currently. I hope I can make a point and not waste your time. Best Jan, hopefully everything makes sense Jan, Currently, there is no switch to disable the time based index. There are quite a few use cases of time based index. 1. From KIP-33's wiki, it allows us to do time-based retention accurately. Before KIP-33, the time-based retention is based on the last modified time of each log segment. The main issue is that last modified time can change over time. For example, if a broker loses storage and has to re-replicate all data, those re-replicated segments will be retained much longer since their last modified time is more recent. Having a time-based index allows us to retain segments based on the message time, not the last modified time. This can also benefit KIP-71, where we want to combine time-based retention and compaction. /If your sparse on discspace, one could try to get by that with retention.bytes/ or, as we did, ssh into the box and rm it, which worked quite good when no one reads it. Chuckles a little when its read but readers usually do an auto.offset.reset (they are to slow any ways if they reading the last segments hrhr). 2. In KIP-58, we want to delay log compaction based on a configurable amount of time. Time-based index allows us to do this more accurately. /good point, seems reasonable/ 3. We plan to add an api in the consumer to allow seeking to an offset based on a timestamp. The time based index allows us to do this more accurately and fast. /Sure, I personally feel that you rarely want to do this. For Camus, we used max.pull.historic.days (or simmilliar) successfully quite often. we just gave it an extra day and got what we wanted and for debugging my bisect tool works well enough. So these are the 2 usecases we expierenced already and found a decent way around it./ Now for the impact. a. There is a slight change on how time-based rolling works. Before KIP-33, rolling was based on the time when a segment was loaded in the broker. After KIP-33, rolling is based on the time of the first message of a segment. Not sure if this is your concern. In the common case, the two behave more or less the same. The latter is actually more deterministic since it's not sensitive to broker restarts. /This is part of my main concern indeed. This is what scares me and I preffered to just opt out, instead of reviewing all our pipelines to check whats gonna happen when we put it live. For Example the Mirrormakers, If they want to preserve create time from the source cluster and publish the same create time (wich they should do, if you don't encode your own timestamps and want to have proper kafka-streams windowing). Then I am quite concerned when have problems if our cross ocian links and fall behind, say a day or two. Then I can think of an very up to date MirrorMaker from one colocation and a very laggy Mirrormaker from another colocation. For me its not 100% clear whats gonna happen. But I can't think of sane defaults there. That i love kafka for. Just tricky to be convinced that an upgrade is safe, wich was usually easy. / b. Time-based index potentially adds overhead to producing messages and loading segments. Our experiments show that the impact to producing is insignificant. The time to load segments when restarting a broker can be doubled. However, the absolute time is still reasonable. For example, loading 10K log segments with time-based index takes about 5 seconds. / //Loading should be fine/, totally agree c Because time-based index is useful in several cases and the impact seems small, we didn't consider making time based index optional. Finally, although it's possible to make the time based index optional, it will add more complexity to the code base. So, we probably should only consider it if it's truly needed. Thanks, /I think one can get away with an easier codebase here. The trick is not to have the LOG to implement all the logic, but just have the broker maintain a Set of Indexes, that gets initialized in starup and passed to the LOG. One could ask each individual index, if that logsegment should be rolled, compacted, truncated whatever. Once could also give that LogSegment to each index and make it rebuild the index for example. I didn't figure out the details. But this https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/
Re: KIP-33 Opt out from Time Based indexing
Jan, Currently, there is no switch to disable the time based index. There are quite a few use cases of time based index. 1. From KIP-33's wiki, it allows us to do time-based retention accurately. Before KIP-33, the time-based retention is based on the last modified time of each log segment. The main issue is that last modified time can change over time. For example, if a broker loses storage and has to re-replicate all data, those re-replicated segments will be retained much longer since their last modified time is more recent. Having a time-based index allows us to retain segments based on the message time, not the last modified time. This can also benefit KIP-71, where we want to combine time-based retention and compaction. 2. In KIP-58, we want to delay log compaction based on a configurable amount of time. Time-based index allows us to do this more accurately. 3. We plan to add an api in the consumer to allow seeking to an offset based on a timestamp. The time based index allows us to do this more accurately and fast. Now for the impact. a. There is a slight change on how time-based rolling works. Before KIP-33, rolling was based on the time when a segment was loaded in the broker. After KIP-33, rolling is based on the time of the first message of a segment. Not sure if this is your concern. In the common case, the two behave more or less the same. The latter is actually more deterministic since it's not sensitive to broker restarts. b. Time-based index potentially adds overhead to producing messages and loading segments. Our experiments show that the impact to producing is insignificant. The time to load segments when restarting a broker can be doubled. However, the absolute time is still reasonable. For example, loading 10K log segments with time-based index takes about 5 seconds. Because time-based index is useful in several cases and the impact seems small, we didn't consider making time based index optional. Finally, although it's possible to make the time based index optional, it will add more complexity to the code base. So, we probably should only consider it if it's truly needed. Thanks, Jun On Mon, Aug 22, 2016 at 12:24 AM, Jan Filipiak wrote: > Hello everyone, > > I stumbled across KIP-33 and the time based index, while briefly checking > the wiki and commits, I fail to find a way to opt out. > I saw it having quite some impact on when logs are rolled and was hoping > not to have to deal with all of that. Is there a disable switch I > overlooked? > > Does anybody have a good use case where the timebase index comes in handy? > I made a custom console consumer for me, > that can bisect a log based on time. Its just a quick probabilistic shot > into the log but is sometimes quite useful for some debugging. > > Best Jan >
Re: KIP-33 Opt out from Time Based indexing
Can you describe the behavior you saw that you didn't like? -Jay On Mon, Aug 22, 2016 at 12:24 AM, Jan Filipiak wrote: > Hello everyone, > > I stumbled across KIP-33 and the time based index, while briefly checking > the wiki and commits, I fail to find a way to opt out. > I saw it having quite some impact on when logs are rolled and was hoping > not to have to deal with all of that. Is there a disable switch I > overlooked? > > Does anybody have a good use case where the timebase index comes in handy? > I made a custom console consumer for me, > that can bisect a log based on time. Its just a quick probabilistic shot > into the log but is sometimes quite useful for some debugging. > > Best Jan >