Jan, For the time rolling issue, Jiangjie has committed a fix ( https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can help test out trunk and see if there are any other issues related to time-based index?
Thanks, Jun On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi Jun, > > sorry for the late reply. Regarding B, my main concern was just complexity > of understanding what's going on. > As you can see it took me probably some 2 days or so, to fully grab all > the details in the implementation and what > the impacts are. Usually I prefer to turn things I don't use off, so I > don't have to bother. Log Append time will work for me. > > Rolling logs was my main concern. The producer can specify the timestamp > and we use epoch inside the message, I'd bet money, > people in the company would have put this epoch also in the produce > record. => rollings logs as the broker thinks its millis. > So that would probably have caused us at least one outage if a big > producer had upgraded and done this, IMO likely mistake. > > Id just hoped for a more obvious kill-switch, so I didn’t need to bother > that much. > > Best Jan > > > > > > On 29.08.2016 19:36, Jun Rao wrote: > >> Jan, >> >> For the usefulness of time index, it's ok if you don't plan to use it. >> However, I do think there are other people who will want to use it. Fixing >> an application bug always requires some additional work. Intuitively, >> being >> able to seek back to a particular point of time for replay is going to be >> much more efficient than always replaying from the very beginning, >> especially when the log is retained for a long period of time. Sure, if >> you >> want to have more confidence, you want to rewind a bit conservatively. But >> being able to rewind an extra hour makes a big difference from having to >> rewind all to way to 7 days or however long the retention time is. >> >> For the OffsetRequest, I actually agree with you that it's useful. People >> can use that to find the first and the last offset and the offset based on >> a specific point in time. The part that's a bit awkward with OffsetRequest >> is that it's based on the last modified time of the log segment, which >> makes it imprecise (precision is at the segment level, not message level) >> and non-deterministic (last modified time may change). Another awkwardness >> is that it supports returning a list of offsets after a specified >> timestamp. We did that simply because timestamp was only at the segment >> level then. So, our plan is to replace OffsetRequest with a new one. It >> will give you the same functionality: find the first and the last offset >> and the offset based on a specific point in time. It will just be better >> since it's more precise and more deterministic. For your use case, it >> seems >> that you don't care about message creation time. Then, it's possible for >> you to configure the broker with the log append time. Whether this should >> be default at the Kafka level is debatable, but it won't prevent your use >> case. >> >> For your suggesting on refactoring, I still want to understand how >> necessary it is. Your main concerns so far seem to be. >> (a) Impact on rolling log segments. >> (b) Time-based index is not useful for me. >> >> Item (a) is a good point. Thanks for that. We will fix it. Item (b), I >> have >> given my view on this above. Are there any other things that you think >> that >> having a time-based index will hurt? >> >> Thanks, >> >> Jun >> >> On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >> Hi Jun, >>> >>> thanks for taking the time to answer on such a detailed level. You are >>> right Log.fetchOffsetByTimestamp works, the comment is just confusing >>> "// Get all the segments whose largest timestamp is smaller than target >>> timestamp" wich is apparently is not what takeWhile does (I am more on >>> the Java end of things, so I relied on the comment). >>> >>> Regarding the frequent file rolling i didn't think of Logcompaction but >>> that indeed is a place where **** can hit the fan pretty easy. especially >>> if you don't have many updates in there and you pass the timestamp along >>> in >>> a kafka-streams application. Bootstrapping a new application then indeed >>> could produce quite a few old messages kicking this logrolling of until a >>> recent message appears. I guess that makes it a practical issue again >>> even >>> with the 7 days. Thanks for pointing out! Id like to see the appendTime >>> as >>> default, I am very happy that I have it in the backpocket for purpose of >>> tighter sleep and not to worry to much about someone accidentally doing >>> something dodgy on a weekend with our clusters >>> >>> Regarding the usefulness, you will not be able to sell it for me. I don't >>> know how people build applications with this ¯\_(ツ)_/¯ but I don't want >>> to >>> see them. >>> Look at the error recovery with timestamp seek: >>> For fixing a bug, a user needs to stop the SP, truncate all his >>> downstream >>> data perfectly based on their time window.Then restart and do the first >>> fetch based >>> again on the perfect window timeout. From then on, he still has NO clue >>> whatsoever if messages that come later now with an earlier timestamp need >>> to go into the >>> previous window or not. (Note that there is >>>absolutly no<<< way to >>> determine this in aggregated downstream windowed stores). So the user is >>> in >>> .... even though he can seek, he >>> can't rule out his error. IMO it helps them to build the wrong thing, >>> that >>> will just be operational pain *somewhere* >>> >>> Look at the error recovery without timestamp seek: >>> start your application from beginning with a different output >>> (version,key,partition) wait for it to fully catch up. drop the >>> timewindows >>> the error happend + confidence interval (if your data isnt there anymore, >>> no seek will help) in from the old version. Stop the stream processor, >>> merge the data it created, switch back to the original >>> (version,key,partition) and start the SP again. >>> Done. As bigger you choose the confidence interval, the more correct, the >>> less the index helps. usually you want maximum confidence => no index >>> usage, get everything that is still there. (Maybe even redump from hadoop >>> in extreme cases) ironically causing the log to roll all the time (as you >>> probably publish to a new topic and have the streams application use >>> both) >>> :( >>> >>> As you can see, even though the users can seek, if they want to create >>> proper numbers, Billing information eg. They are in trouble, and giving >>> them this index will just make them implement the wrong solution! It >>> boils >>> down to: this index is not the kafka way of doing things. The index can >>> help the second approach but usually one chooses the confidence interval >>> = >>> as much as one can get. >>> >>> Then the last thing. "OffsetRequest is a legacy request. It's awkward to >>> use and we plan to deprecate it over time". You got to be kidding me. It >>> was wired to get the byteposition back then, but getting the offsets is >>> perfectly reasonable and one of the best things in the world. want to >>> know >>> how your stream looked at a specific point in time? get start and end >>> offset, fetch whenever you like, you get an perfect snapshot in wall >>> time. >>> this is usefull for compacted topis aswell as streaming topics. Offsets >>> are >>> a well known thing in kafka and in no way awkward as its monotonically >>> increasing property is just great. >>> >>> For seeking the log based on a confidence interval (the only chance you >>> get in non-key logs reprocessing) one can also bisect the log from the >>> client. As the case is rare it is intensive and causes at least a few >>> hundreds seeks for bigger topics. but I guess the broker does these extra >>> for the new index file now. >>> >>> This index, I feel is just not following the whole "kafka-way". Can you >>> suggest on the proposed re-factoring? what are the chance to get it >>> upstream if I could pull it off? (unlikely) >>> >>> Thanks for all the effort you put in into listening to my concerns. >>> highly >>> appreciated! >>> >>> Best Jan >>> >>> >>> >>> >>> On 25.08.2016 23:36, Jun Rao wrote: >>> >>> Jan, >>> >>> Thanks a lot for the feedback. Now I understood your concern better. The >>> following are my comments. >>> >>> The first odd thing that you pointed out could be a real concern. >>> Basically, if a producer publishes messages with really old timestamp, >>> our >>> default log.roll.hours (7 days) will indeed cause the broker to roll a >>> log >>> on ever message, which would be bad. Time-based rolling is actually used >>> infrequently. The only use case that I am aware of is that for compacted >>> topics, rolling logs based on time could allow the compaction to happen >>> sooner (since the active segment is never cleaned). One option is to >>> change >>> the default log.roll.hours to infinite and also document the impact on >>> changing log.roll.hours. Jiangjie, what do you think? >>> >>> For the second odd thing, the OffsetRequest is a legacy request. It's >>> awkward to use and we plan to deprecate it over time. That's why we >>> haven't >>> change the logic in serving OffsetRequest after KIP-33. The plan is to >>> introduce a new OffsetRequest that will be exploiting the time based >>> index. >>> It's possible to have log segments with non-increasing largest timestamp. >>> As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the >>> segments in offset order and stop when we see the target timestamp. >>> >>> For the third odd thing, one of the original reasons why the time-based >>> index points to an offset instead of the file position is that it makes >>> truncating the time index to an offset easier since the offset is in the >>> index. Looking at the code, we could also store the file position in the >>> time index and do truncation based on position, instead of offset. It >>> probably has a slight advantage of consistency between the two indexes >>> and >>> avoiding another level of indirection when looking up the time index. >>> Jiangjie, have we ever considered that? >>> >>> The idea of log.message.timestamp.difference.max.ms is to prevent the >>> timestamp in the published messages to drift too far away from the >>> current >>> timestamp. The default value is infinite though. >>> >>> Lastly, for the usefulness of time-based index, it's actually a feature >>> that the community wanted and voted for, not just for Confluent >>> customers. >>> For example, being able to seek to an offset based on timestamp has been >>> a >>> frequently asked feature. This can be useful for at least the following >>> scenarios: (1) If there is a bug in a consumer application, the user will >>> want to rewind the consumption after fixing the logic. In this case, it's >>> more convenient to rewind the consumption based on a timestamp. (2) In a >>> multi data center setup, it's common for people to mirror the data from >>> one >>> Kafka cluster in one data center to another cluster in a different data >>> center. If one data center fails, people want to be able to resume the >>> consumption in the other data center. Since the offsets are not >>> preserving >>> between the two clusters through mirroring, being able to find a starting >>> offset based on timestamp will allow the consumer to resume the >>> consumption >>> without missing any messages and also not replaying too many messages. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>> Hey Jun, >>>> >>>> I go and try again :), wrote the first one in quite a stressful >>>> environment. The bottom line is that I, for our use cases, see a to >>>> small >>>> use/effort ratio in this time index. >>>> We do not bootstrap new consumers for key-less logs so frequently and >>>> when we do it, they usually want everything (prod deployment) or just >>>> start >>>> at the end ( during development). >>>> That caused quite some frustration. Would be better if I could just have >>>> turned it off and don't bother any more. Anyhow in the meantime I had to >>>> dig deeper into the inner workings >>>> and the impacts are not as dramatic as I initially assumed. But it still >>>> carries along some oddities I want to list here. >>>> >>>> first odd thing: >>>> Quote >>>> --- >>>> Enforce time based log rolling >>>> >>>> Currently time based log rolling is based on the creating time of the >>>> log >>>> segment. With this KIP, the time based rolling would be changed to >>>> based on >>>> the largest timestamp ever seen in a log segment. A new log segment >>>> will be >>>> rolled out if current time is greater than largest timestamp ever seen >>>> in >>>> the log segment + log.roll.ms. When message.timestamp.type=CreateTime, >>>> user should set max.message.time.difference.ms appropriately together >>>> with log.roll.ms to avoid frequent log segment roll out. >>>> --- >>>> imagine a Mirrormaker falls behind and the Mirrormaker has a delay of >>>> some time > log.roll.ms. >>>> From my understanding, when noone else is producing to this partition >>>> except the mirror maker, the broker will start rolling on every append? >>>> Just because you maybe under DOS-attack and your application only works >>>> in the remote location. (also a good occasion for MM to fall behind) >>>> But checking the default values indicates that it should indeed not >>>> become a problem as log.roll.ms defaults to ~>7 days. >>>> >>>> >>>> second odd thing: >>>> Quote >>>> --- >>>> A time index entry (*T*, *offset*) means that in this segment any >>>> message whose timestamp is greater than *T* come after *offset.* >>>> >>>> The OffsetRequest behaves almost the same as before. If timestamp *T* is >>>> set in the OffsetRequest, the first offset in the returned offset >>>> sequence >>>> means that if user want to consume from *T*, that is the offset to start >>>> with. The guarantee is that any message whose timestamp is greater than >>>> T >>>> has a bigger offset. i.e. Any message before this offset has a >>>> timestamp < >>>> *T*. >>>> --- >>>> >>>> Given how the index is maintained, with a little bit of bad luck >>>> (rolling >>>> upgrade/config change of mirrormakers for different colocations) one >>>> ends >>>> with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimest >>>> amp. >>>> If I do not overlook something here, then the fetch code does not seem >>>> to >>>> take that into account. >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604 >>>> In this case the Goal listed number 1, not loose any messages, is not >>>> achieved. easy fix seems to be to sort the segsArray by maxtimestamp but >>>> can't wrap my head around it just now. >>>> >>>> >>>> third odd thing: >>>> Regarding the worry of increasing complexity. Looking at the code >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 >>>> -196 >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 & >>>> 230 >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 >>>> -266 >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 >>>> -307 >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 - >>>> 410 >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 - >>>> 435 >>>> https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3 >>>> f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 >>>> -108 >>>> and especially >>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717 >>>> it feels like the Log & Log segment having a detailed knowledge about >>>> the >>>> maintained indexes is not the ideal way to model the problem. >>>> Having the Server maintian a Set of Indexes could reduce the code >>>> complexity, while also allowing an easy switch to turn it off. I think >>>> both >>>> indexes could point to the physical position, a client would do >>>> fetch(timestamp), and the continue with the offsets as usual. Is there >>>> any >>>> specific reason the timestamp index points into the offset index? >>>> For reading one would need to branch earlier, maybe already in >>>> ApiHandler >>>> and decide what indexes to query, but this branching logic is there now >>>> anyhow. >>>> >>>> Further I also can't think of a situation where one wants to have this >>>> log.message.timestamp.difference.max.ms take effect. I think this >>>> defeats goal 1 again. >>>> >>>> ITE having this index in the brokers now feels wired to me. Gives me a >>>> feeling of complexity that I don't need and have a hard time figuring >>>> out >>>> how much other people can benefit from it. I hope that this feedback is >>>> useful and helps to understand my scepticism regarding this thing. There >>>> were some other oddities that I have a hard time recalling now. So i >>>> guess >>>> the index was build for a specific confluent customer, will there be any >>>> blogpost about their usecase? or can you share it? >>>> >>>> Best Jan >>>> >>>> >>>> On 24.08.2016 16:47, Jun Rao wrote: >>>> >>>> Jan, >>>> >>>> Thanks for the reply. I actually wasn't sure what your main concern on >>>> time-based rolling is. Just a couple of clarifications. (1) Time-based >>>> rolling doesn't control how long a segment will be retained for. For >>>> retention, if you use time-based, it will now be based on the timestamp >>>> in >>>> the message. If you use size-based, it works the same as before. Is your >>>> concern on time-based retention? If so, you can always configure the >>>> timestamp in all topics to be log append time, which will give you the >>>> same >>>> behavior as before. (2) The creation time of the segment is never >>>> exposed >>>> to the consumer and therefore is never preserved in MirrorMaker. In >>>> contrast, the timestamp in the message will be preserved in MirrorMaker. >>>> So, not sure what your concern on MirrorMaker is. >>>> >>>> Jun >>>> >>>> On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <jan.filip...@trivago.com >>>> > >>>> wrote: >>>> >>>> Hi Jun, >>>>> >>>>> I copy pasted this mail from the archive, as I somehow didn't receive >>>>> it >>>>> per mail. I will sill make some comments in line, >>>>> hopefully you can find them quick enough, my apologies. >>>>> >>>>> To make things more clear, you should also know, that all messages in >>>>> our kafka setup have a common way to access their timestamp already >>>>> (its >>>>> encoded in the value the same way always) >>>>> Sometimes this is a logical time (eg same timestamp accross many >>>>> different topics / partitions), say PHP request start time or the >>>>> like. So >>>>> kafkas internal timestamps are not really attractive >>>>> for us anyways currently. >>>>> >>>>> I hope I can make a point and not waste your time. >>>>> >>>>> Best Jan, >>>>> >>>>> hopefully everything makes sense >>>>> >>>>> -------- >>>>> >>>>> Jan, >>>>> >>>>> Currently, there is no switch to disable the time based index. >>>>> >>>>> There are quite a few use cases of time based index. >>>>> >>>>> 1. From KIP-33's wiki, it allows us to do time-based retention >>>>> accurately. >>>>> Before KIP-33, the time-based retention is based on the last modified >>>>> time >>>>> of each log segment. The main issue is that last modified time can >>>>> change >>>>> over time. For example, if a broker loses storage and has to >>>>> re-replicate >>>>> all data, those re-replicated segments will be retained much longer >>>>> since >>>>> their last modified time is more recent. Having a time-based index >>>>> allows >>>>> us to retain segments based on the message time, not the last modified >>>>> time. This can also benefit KIP-71, where we want to combine time-based >>>>> retention and compaction. >>>>> >>>>> /If your sparse on discspace, one could try to get by that with >>>>> retention.bytes/ >>>>> or, as we did, ssh into the box and rm it, which worked quite good when >>>>> no one reads it. >>>>> Chuckles a little when its read but readers usually do an >>>>> auto.offset.reset >>>>> (they are to slow any ways if they reading the last segments hrhr). >>>>> >>>>> 2. In KIP-58, we want to delay log compaction based on a configurable >>>>> amount of time. Time-based index allows us to do this more accurately. >>>>> >>>>> /good point, seems reasonable/ >>>>> >>>>> 3. We plan to add an api in the consumer to allow seeking to an offset >>>>> based on a timestamp. The time based index allows us to do this more >>>>> accurately and fast. >>>>> >>>>> /Sure, I personally feel that you rarely want to do this. For Camus, we >>>>> used max.pull.historic.days (or simmilliar) successfully quite often. >>>>> we >>>>> just gave it an extra day and got what we wanted >>>>> and for debugging my bisect tool works well enough. So these are the 2 >>>>> usecases we expierenced already and found a decent way around it./ >>>>> >>>>> Now for the impact. >>>>> >>>>> a. There is a slight change on how time-based rolling works. Before >>>>> KIP-33, >>>>> rolling was based on the time when a segment was loaded in the broker. >>>>> After KIP-33, rolling is based on the time of the first message of a >>>>> segment. Not sure if this is your concern. In the common case, the two >>>>> behave more or less the same. The latter is actually more deterministic >>>>> since it's not sensitive to broker restarts. >>>>> >>>>> /This is part of my main concern indeed. This is what scares me and I >>>>> preffered to just opt out, instead of reviewing all our pipelines to >>>>> check >>>>> whats gonna happen when we put it live. >>>>> For Example the Mirrormakers, If they want to preserve create time from >>>>> the source cluster and publish the same create time (wich they should >>>>> do, >>>>> if you don't encode your own timestamps and want >>>>> to have proper kafka-streams windowing). Then I am quite concerned when >>>>> have problems if our cross ocian links and fall behind, say a day or >>>>> two. >>>>> Then I can think of an very up to date MirrorMaker from >>>>> one colocation and a very laggy Mirrormaker from another colocation. >>>>> For >>>>> me its not 100% clear whats gonna happen. But I can't think of sane >>>>> defaults there. That i love kafka for. >>>>> Just tricky to be convinced that an upgrade is safe, wich was usually >>>>> easy. >>>>> / >>>>> b. Time-based index potentially adds overhead to producing messages and >>>>> loading segments. Our experiments show that the impact to producing is >>>>> insignificant. The time to load segments when restarting a broker can >>>>> be >>>>> doubled. However, the absolute time is still reasonable. For example, >>>>> loading 10K log segments with time-based index takes about 5 seconds. >>>>> / >>>>> //Loading should be fine/, totally agree >>>>> >>>>> c Because time-based index is useful in several cases and the impact >>>>> seems >>>>> small, we didn't consider making time based index optional. Finally, >>>>> although it's possible to make the time based index optional, it will >>>>> add >>>>> more complexity to the code base. So, we probably should only consider >>>>> it >>>>> if it's truly needed. Thanks, >>>>> >>>>> /I think one can get away with an easier codebase here. The trick is >>>>> not >>>>> to have the LOG to implement all the logic, >>>>> but just have the broker maintain a Set of Indexes, that gets >>>>> initialized in starup and passed to the LOG. One could ask each >>>>> individual >>>>> index, if that logsegment should be rolled, compacted, truncated >>>>> whatever. Once could also give that LogSegment to each index and make >>>>> it >>>>> rebuild >>>>> the index for example. I didn't figure out the details. But this >>>>> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 >>>>> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715 >>>>> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich >>>>> should already be easier. >>>>> If users don't want time based indexing, just don't put the timebased >>>>> index in the Set then and everything should work like a charm. >>>>> RPC calls that work on the specific indexes would need to throw an >>>>> exception of some kind. >>>>> Just an idea. >>>>> / >>>>> Jun >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 22.08.2016 09 <22.08.2016%2009>:24, Jan Filipiak wrote: >>>>> >>>>> Hello everyone, >>>>>> >>>>>> I stumbled across KIP-33 and the time based index, while briefly >>>>>> checking the wiki and commits, I fail to find a way to opt out. >>>>>> I saw it having quite some impact on when logs are rolled and was >>>>>> hoping not to have to deal with all of that. Is there a disable >>>>>> switch I >>>>>> overlooked? >>>>>> >>>>>> Does anybody have a good use case where the timebase index comes in >>>>>> handy? I made a custom console consumer for me, >>>>>> that can bisect a log based on time. Its just a quick probabilistic >>>>>> shot into the log but is sometimes quite useful for some debugging. >>>>>> >>>>>> Best Jan >>>>>> >>>>>> >>>>> >>>> >>> >