Hi Jun,

thanks for taking the time to answer on such a detailed level. You are right Log.fetchOffsetByTimestamp works, the comment is just confusing "// Get all the segments whose largest timestamp is smaller than target timestamp" wich is apparently is not what takeWhile does (I am more on the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but that indeed is a place where **** can hit the fan pretty easy. especially if you don't have many updates in there and you pass the timestamp along in a kafka-streams application. Bootstrapping a new application then indeed could produce quite a few old messages kicking this logrolling of until a recent message appears. I guess that makes it a practical issue again even with the 7 days. Thanks for pointing out! Id like to see the appendTime as default, I am very happy that I have it in the backpocket for purpose of tighter sleep and not to worry to much about someone accidentally doing something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't know how people build applications with this ¯\_(ツ)_/¯ but I don't want to see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his downstream data perfectly based on their time window.Then restart and do the first fetch based again on the perfect window timeout. From then on, he still has NO clue whatsoever if messages that come later now with an earlier timestamp need to go into the previous window or not. (Note that there is >>>absolutly no<<< way to determine this in aggregated downstream windowed stores). So the user is in .... even though he can seek, he can't rule out his error. IMO it helps them to build the wrong thing, that will just be operational pain *somewhere*

Look at the error recovery without timestamp seek:
start your application from beginning with a different output (version,key,partition) wait for it to fully catch up. drop the timewindows the error happend + confidence interval (if your data isnt there anymore, no seek will help) in from the old version. Stop the stream processor, merge the data it created, switch back to the original (version,key,partition) and start the SP again. Done. As bigger you choose the confidence interval, the more correct, the less the index helps. usually you want maximum confidence => no index usage, get everything that is still there. (Maybe even redump from hadoop in extreme cases) ironically causing the log to roll all the time (as you probably publish to a new topic and have the streams application use both) :(

As you can see, even though the users can seek, if they want to create proper numbers, Billing information eg. They are in trouble, and giving them this index will just make them implement the wrong solution! It boils down to: this index is not the kafka way of doing things. The index can help the second approach but usually one chooses the confidence interval = as much as one can get.

Then the last thing. "OffsetRequest is a legacy request. It's awkward to use and we plan to deprecate it over time". You got to be kidding me. It was wired to get the byteposition back then, but getting the offsets is perfectly reasonable and one of the best things in the world. want to know how your stream looked at a specific point in time? get start and end offset, fetch whenever you like, you get an perfect snapshot in wall time. this is usefull for compacted topis aswell as streaming topics. Offsets are a well known thing in kafka and in no way awkward as its monotonically increasing property is just great.

For seeking the log based on a confidence interval (the only chance you get in non-key logs reprocessing) one can also bisect the log from the client. As the case is rare it is intensive and causes at least a few hundreds seeks for bigger topics. but I guess the broker does these extra for the new index file now.

This index, I feel is just not following the whole "kafka-way". Can you suggest on the proposed re-factoring? what are the chance to get it upstream if I could pull it off? (unlikely)

Thanks for all the effort you put in into listening to my concerns. highly appreciated!

Best Jan



On 25.08.2016 23:36, Jun Rao wrote:
Jan,

Thanks a lot for the feedback. Now I understood your concern better. The following are my comments.

The first odd thing that you pointed out could be a real concern. Basically, if a producer publishes messages with really old timestamp, our default log.roll.hours (7 days) will indeed cause the broker to roll a log on ever message, which would be bad. Time-based rolling is actually used infrequently. The only use case that I am aware of is that for compacted topics, rolling logs based on time could allow the compaction to happen sooner (since the active segment is never cleaned). One option is to change the default log.roll.hours to infinite and also document the impact on changing log.roll.hours. Jiangjie, what do you think?

For the second odd thing, the OffsetRequest is a legacy request. It's awkward to use and we plan to deprecate it over time. That's why we haven't change the logic in serving OffsetRequest after KIP-33. The plan is to introduce a new OffsetRequest that will be exploiting the time based index. It's possible to have log segments with non-increasing largest timestamp. As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the segments in offset order and stop when we see the target timestamp.

For the third odd thing, one of the original reasons why the time-based index points to an offset instead of the file position is that it makes truncating the time index to an offset easier since the offset is in the index. Looking at the code, we could also store the file position in the time index and do truncation based on position, instead of offset. It probably has a slight advantage of consistency between the two indexes and avoiding another level of indirection when looking up the time index. Jiangjie, have we ever considered that?

The idea of log.message.timestamp.difference.max.ms <http://log.message.timestamp.difference.max.ms/> is to prevent the timestamp in the published messages to drift too far away from the current timestamp. The default value is infinite though.

Lastly, for the usefulness of time-based index, it's actually a feature that the community wanted and voted for, not just for Confluent customers. For example, being able to seek to an offset based on timestamp has been a frequently asked feature. This can be useful for at least the following scenarios: (1) If there is a bug in a consumer application, the user will want to rewind the consumption after fixing the logic. In this case, it's more convenient to rewind the consumption based on a timestamp. (2) In a multi data center setup, it's common for people to mirror the data from one Kafka cluster in one data center to another cluster in a different data center. If one data center fails, people want to be able to resume the consumption in the other data center. Since the offsets are not preserving between the two clusters through mirroring, being able to find a starting offset based on timestamp will allow the consumer to resume the consumption without missing any messages and also not replaying too many messages.

Thanks,

Jun


On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> wrote:

    Hey Jun,

    I go and try again :), wrote the first one in quite a stressful
    environment. The bottom line is that I, for our use cases, see a
    to small use/effort ratio in this time index.
    We do not bootstrap new consumers for key-less logs so frequently
    and when we do it, they usually want everything (prod deployment)
    or just start at the end ( during development).
    That caused quite some frustration. Would be better if I could
    just have turned it off and don't bother any more. Anyhow in the
    meantime I had to dig deeper into the inner workings
    and the impacts are not as dramatic as I initially assumed. But it
    still carries along some oddities I want to list here.

    first odd thing:
    Quote
    ---
    Enforce time based log rolling

    Currently time based log rolling is based on the creating time of
    the log segment. With this KIP, the time based rolling would be
    changed to based on the largest timestamp ever seen in a log
    segment. A new log segment will be rolled out if current time is
    greater than largest timestamp ever seen in the log segment +
    log.roll.ms <http://log.roll.ms>. When
    message.timestamp.type=CreateTime, user should set
    max.message.time.difference.ms
    <http://max.message.time.difference.ms> appropriately together
    with log.roll.ms <http://log.roll.ms> to avoid frequent log
    segment roll out.

    ---
    imagine a Mirrormaker falls behind and the Mirrormaker has a delay
    of some time > log.roll.ms <http://log.roll.ms>.
    From my understanding, when noone else is producing to this
    partition except the mirror maker, the broker will start rolling
    on every append?
    Just because you maybe under DOS-attack and your application only
    works in the remote location. (also a good occasion for MM to fall
    behind)
    But checking the default values indicates that it should indeed
    not become a problem as log.roll.ms <http://log.roll.ms> defaults
    to ~>7 days.


    second odd thing:
    Quote
    ---
    A time index entry (/T/, /offset/) means that in this segment any
    message whose timestamp is greater than /T/ come after /offset./

    The OffsetRequest behaves almost the same as before. If timestamp
    *T* is set in the OffsetRequest, the first offset in the returned
    offset sequence means that if user want to consume from *T*, that
    is the offset to start with. The guarantee is that any message
    whose timestamp is greater than T has a bigger offset. i.e. Any
    message before this offset has a timestamp < *T*.
    ---

    Given how the index is maintained, with a little bit of bad luck
    (rolling upgrade/config change of mirrormakers for different
    colocations) one ends with segmentN.timeindex.maxtimestamp >
    segmentN+1.timeindex.maxtimestamp. If I do not overlook something
    here, then the fetch code does not seem to take that into account.
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604>

    In this case the Goal listed number 1, not loose any messages, is
    not achieved. easy fix seems to be to sort the segsArray by
    maxtimestamp but can't wrap my head around it just now.


    third odd thing:
    Regarding the worry of increasing complexity. Looking at the code
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193>
    -196
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227>
    & 230
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265>
    -266
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305>
    -307
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408>
    - 410
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432>
    - 435
    
https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
    
<https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104>
    -108
    and especially
    
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
    
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717>
    it feels like the Log & Log segment having a detailed knowledge
    about the maintained indexes is not the ideal way to model the
    problem.
    Having the Server maintian a Set of Indexes could reduce the code
    complexity, while also allowing an easy switch to turn it off. I
    think both indexes could point to the physical position, a client
    would do fetch(timestamp), and the continue with the offsets as
    usual. Is there any specific reason the timestamp index points
    into the offset index?
    For reading one would need to branch earlier, maybe already in
    ApiHandler and decide what indexes to query, but this branching
    logic is there now anyhow.

    Further I also can't think of a situation where one wants to have
    this log.message.timestamp.difference.max.ms
    <http://log.message.timestamp.difference.max.ms> take effect. I
    think this defeats goal 1 again.

    ITE having this index in the brokers now feels wired to me. Gives
    me a feeling of complexity that I don't need and have a hard time
    figuring out how much other people can benefit from it. I hope
    that this feedback is useful and helps to understand my scepticism
    regarding this thing. There were some other oddities that I have a
    hard time recalling now. So i guess the index was build for a
    specific confluent customer, will there be any blogpost about
    their usecase? or can you share it?

    Best Jan


    On 24.08.2016 16:47, Jun Rao wrote:
    Jan,

    Thanks for the reply. I actually wasn't sure what your main
    concern on time-based rolling is. Just a couple of
    clarifications. (1) Time-based rolling doesn't control how long a
    segment will be retained for. For retention, if you use
    time-based, it will now be based on the timestamp in the message.
    If you use size-based, it works the same as before. Is your
    concern on time-based retention? If so, you can always configure
    the timestamp in all topics to be log append time, which will
    give you the same behavior as before. (2) The creation time of
    the segment is never exposed to the consumer and therefore is
    never preserved in MirrorMaker. In contrast, the timestamp in the
    message will be preserved in MirrorMaker. So, not sure what your
    concern on MirrorMaker is.

    Jun

    On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak
    <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> wrote:

        Hi Jun,

        I copy pasted this mail from the archive, as I somehow didn't
        receive it per mail. I will sill make some comments in line,
        hopefully you can find them quick enough, my apologies.

        To make things more clear, you should also know, that all
        messages in our kafka setup have a common way to access their
        timestamp already (its encoded in the value the same way always)
        Sometimes this is a logical time (eg same timestamp accross
        many different topics / partitions), say PHP request start
        time or the like. So kafkas internal timestamps are not
        really attractive
        for us anyways currently.

        I hope I can make a point and not waste your time.

        Best Jan,

        hopefully everything makes sense

        --------

        Jan,

        Currently, there is no switch to disable the time based index.

        There are quite a few use cases of time based index.

        1. From KIP-33's wiki, it allows us to do time-based
        retention accurately.
        Before KIP-33, the time-based retention is based on the last
        modified time
        of each log segment. The main issue is that last modified
        time can change
        over time. For example, if a broker loses storage and has to
        re-replicate
        all data, those re-replicated segments will be retained much
        longer since
        their last modified time is more recent. Having a time-based
        index allows
        us to retain segments based on the message time, not the last
        modified
        time. This can also benefit KIP-71, where we want to combine
        time-based
        retention and compaction.

        /If your sparse on discspace, one could try to get by that
        with retention.bytes/
        or, as we did, ssh into the box and rm it, which worked quite
        good when no one reads it.
        Chuckles a little when its read but readers usually do an
        auto.offset.reset
        (they are to slow any ways if they reading the last segments
        hrhr).

        2. In KIP-58, we want to delay log compaction based on a
        configurable
        amount of time. Time-based index allows us to do this more
        accurately.

        /good point, seems reasonable/

        3. We plan to add an api in the consumer to allow seeking to
        an offset
        based on a timestamp. The time based index allows us to do
        this more
        accurately and fast.

        /Sure, I personally feel that you rarely want to do this. For
        Camus, we used max.pull.historic.days (or simmilliar)
        successfully quite often. we just gave it an extra day and
        got what we wanted
        and for debugging my bisect tool works well enough. So these
        are the 2 usecases we expierenced already and found a decent
        way around it./

        Now for the impact.

        a. There is a slight change on how time-based rolling works.
        Before KIP-33,
        rolling was based on the time when a segment was loaded in
        the broker.
        After KIP-33, rolling is based on the time of the first
        message of a
        segment. Not sure if this is your concern. In the common
        case, the two
        behave more or less the same. The latter is actually more
        deterministic
        since it's not sensitive to broker restarts.

        /This is part of my main concern indeed. This is what scares
        me and I preffered to just opt out, instead of reviewing all
        our pipelines to check whats gonna happen when we put it live.
        For Example the Mirrormakers, If they want to preserve create
        time from the source cluster and publish the same create time
        (wich they should do, if you don't encode your own timestamps
        and want
        to have proper kafka-streams windowing). Then I am quite
        concerned when have problems if our cross ocian links and
        fall behind, say a day or two. Then I can think of an very up
        to date MirrorMaker from
        one colocation and a very laggy Mirrormaker from another
        colocation. For me its not 100% clear whats gonna happen. But
        I can't think of sane defaults there. That i love kafka for.
        Just tricky to be convinced that an upgrade is safe, wich was
        usually easy.
        /
        b. Time-based index potentially adds overhead to producing
        messages and
        loading segments. Our experiments show that the impact to
        producing is
        insignificant. The time to load segments when restarting a
        broker can be
        doubled. However, the absolute time is still reasonable. For
        example,
        loading 10K log segments with time-based index takes about 5
        seconds.
        /
        //Loading should be fine/, totally agree

        c Because time-based index is useful in several cases and the
        impact seems
        small, we didn't consider making time based index optional.
        Finally,
        although it's possible to make the time based index optional,
        it will add
        more complexity to the code base. So, we probably should only
        consider it
        if it's truly needed. Thanks,

        /I think one can get away with an easier codebase here. The
        trick is not to have the LOG to implement all the logic,
        but just have the broker maintain a Set of Indexes, that gets
        initialized in starup and passed to the LOG. One could ask
        each individual
        index, if that logsegment should be rolled, compacted,
        truncated whatever.  Once could also give that LogSegment to
        each index and make it rebuild
        the index for example. I didn't figure out the details. But this
        
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
        
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715>
        might end up with for(Index i : indexes)
        [i.shouldRoll(segment)}? wich should already be easier.
        If users don't want time based indexing, just don't put the
        timebased index in the Set then and everything should work
        like a charm.
        RPC calls that work on the specific indexes would need to
        throw an exception of some kind.
        Just an idea.
        /
        Jun





        On 22.08.2016 09 <tel:22.08.2016%2009>:24, Jan Filipiak wrote:

            Hello everyone,

            I stumbled across KIP-33 and the time based index, while
            briefly checking the wiki and commits, I fail to find a
            way to opt out.
            I saw it having quite some impact on when logs are rolled
            and was hoping not to have to deal with all of that. Is
            there a disable switch I overlooked?

            Does anybody have a good use case where the timebase
            index comes in handy? I made a custom console consumer
            for me,
            that can bisect a log based on time. Its just a quick
            probabilistic shot into the log but is sometimes quite
            useful for some debugging.

            Best Jan






Reply via email to