Jiangjie,

I am not sure about changing the default to LogAppendTime since CreateTime
is probably what most people want. It also doesn't solve the problem
completely. For example, if you do partition reassignment and need to copy
a bunch of old log segments to a new broker, this may cause log rolling on
every message.

Another alternative is to just keep the old time rolling behavior, which is
rolling based on the create time of the log segment. I had two use cases of
time-based rolling in mind. The first one is for users who don't want to
retain a message (say sensitive data) in the log for too long. For this,
one can set a time-based retention. If the log can roll periodically based
on create time, it will freeze the largest timestamp in the rolled segment
and cause it to be deleted when the time limit has been reached. Rolling
based on the timestamp of the first message doesn't help much here since
the retention is always based on the largest timestamp. The second one is
for log cleaner to happen quicker. Rolling logs periodically based on
create time will also work. So, it seems that if we preserve the old time
rolling behavior, we won't lose much functionality, but will avoid the
corner case where the logs could be rolled on every message. What do you
think?

About storing file position in the time index, I don't think it needs to
incur overhead during append. At the beginning of append, we are already
getting the end position of the log (for maintaining the offset index). We
can just keep track of that together with the last seen offset. Storing the
position has the slight benefit that it avoids another indirection and
seems more consistent with the offset index. It's worth thinking through
whether this is better. If we want to change it, it's better to change it
now than later.

Thanks,

Jun

On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jan,
>
> It seems your main concern is for the changed behavior of time based log
> rolling and time based retention. That is actually why we have two
> timestamp types. If user set the log.message.timestamp.type to
> LogAppendTime, the broker will behave exactly the same as they were, except
> the rolling and retention would be more accurate and independent to the
> replica movements.
>
> The log.message.timestam.max.difference.ms is only useful when users are
> using CreateTime. It is kind of a protection on the broker because an
> insanely large timestamp could ruin the time index due to the way we handle
> out-of-order timestamps when using CreateTime. But the users who are using
> LogAppendTime do not need to worry about this at all.
>
> The first odd thing is a valid concern. In your case, because you have the
> timestamp in the message value, it is probably fine to just use
> LogAppendTime on the broker, so the timestamp will only be used to provide
> accurate log retention and log rolling based on when the message was
> produced to the broker regardless when the message was created. This should
> provide the exact same behavior on the broker side as before. (Apologies
> for the stale WIKI statement on the lines you quoted, as Jun said, the log
> segment rolling is based on the timestamp of the first message instead of
> the largest timestamp in the log segment. I sent a change notification to
> the mailing list but forgot to update the wiki page. I just updated the
> wiki page.)
>
> The second odd thing, as Jun mentioned, by design we do not keep a global
> timestamp order. During the search, we start from the oldest segment and
> scan over the segment until we find the first segment that contains a
> timestamp which is larger than the target timestamp. This should guarantee
> no message with larger timestamp will be missed. For example if we have 3
> segments whose largest timestamps are 100 300 200, and we are looking for
> timestamp 250, we will start to scan at first segment and stop at the
> second segment and search inside that segment for the first timestmap
> greater or equals to 250. So reordered largest timestamp across segments
> should not be an issue.
>
> The third odd thing is a good point. There are a few reasons we chose to
> store the offsets instead physical position in the time index. Easier
> truncation is one of the reasons but this may not be a big issue. Another
> reason is that in the early implementation, the time index and offset index
> are actually aligned, i.e. each offset in the time index as a corresponding
> entry in the offset index ( the reverse is not true). So the physical
> position is already stored in the offset index. Later on we switched to the
> current implementation, which has the time index pointing to the exact
> shallow message in the log segment. With this implementation, if the
> message with the largest timestamp appears in the middle of an uncompressed
> message set, we may need to calculate the physical position for that
> message. This is doable but could potentially be an overhead for each
> append and adding some complexity. Given that OffsetRequest is supposed to
> be a pretty infrequent request, it is probably OK to do the secondary
> lookup but save the work on each append.
>
> Jun has already mentioned a few use cases for searching by timestamp. At
> LinkedIn we also have several such use cases where people want to rewind
> the offsets to a certain time and reprocess the streams.
>
> @Jun, currently we are using CreateTime as the default value for
> log.message.timestamp.type. I am wondering would it be less surprising if
> we change the default value to LogAppendTime so that the previous behavior
> is maintained, because for users it would be bad if upgrading cause their
> message got deleted due the change of the behavior. What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Thu, Aug 25, 2016 at 2:36 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Jan,
> >
> > Thanks a lot for the feedback. Now I understood your concern better. The
> > following are my comments.
> >
> > The first odd thing that you pointed out could be a real concern.
> > Basically, if a producer publishes messages with really old timestamp,
> our
> > default log.roll.hours (7 days) will indeed cause the broker to roll a
> log
> > on ever message, which would be bad. Time-based rolling is actually used
> > infrequently. The only use case that I am aware of is that for compacted
> > topics, rolling logs based on time could allow the compaction to happen
> > sooner (since the active segment is never cleaned). One option is to
> change
> > the default log.roll.hours to infinite and also document the impact on
> > changing log.roll.hours. Jiangjie, what do you think?
> >
> > For the second odd thing, the OffsetRequest is a legacy request. It's
> > awkward to use and we plan to deprecate it over time. That's why we
> haven't
> > change the logic in serving OffsetRequest after KIP-33. The plan is to
> > introduce a new OffsetRequest that will be exploiting the time based
> index.
> > It's possible to have log segments with non-increasing largest timestamp.
> > As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
> > segments in offset order and stop when we see the target timestamp.
> >
> > For the third odd thing, one of the original reasons why the time-based
> > index points to an offset instead of the file position is that it makes
> > truncating the time index to an offset easier since the offset is in the
> > index. Looking at the code, we could also store the file position in the
> > time index and do truncation based on position, instead of offset. It
> > probably has a slight advantage of consistency between the two indexes
> and
> > avoiding another level of indirection when looking up the time index.
> > Jiangjie, have we ever considered that?
> >
> > The idea of log.message.timestamp.difference.max.ms is to prevent the
> > timestamp in the published messages to drift too far away from the
> current
> > timestamp. The default value is infinite though.
> >
> > Lastly, for the usefulness of time-based index, it's actually a feature
> > that the community wanted and voted for, not just for Confluent
> customers.
> > For example, being able to seek to an offset based on timestamp has been
> a
> > frequently asked feature. This can be useful for at least the following
> > scenarios: (1) If there is a bug in a consumer application, the user will
> > want to rewind the consumption after fixing the logic. In this case, it's
> > more convenient to rewind the consumption based on a timestamp. (2) In a
> > multi data center setup, it's common for people to mirror the data from
> one
> > Kafka cluster in one data center to another cluster in a different data
> > center. If one data center fails, people want to be able to resume the
> > consumption in the other data center. Since the offsets are not
> preserving
> > between the two clusters through mirroring, being able to find a starting
> > offset based on timestamp will allow the consumer to resume the
> consumption
> > without missing any messages and also not replaying too many messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <jan.filip...@trivago.com>
> > wrote:
> >
> > > Hey Jun,
> > >
> > > I go and try again :), wrote the first one in quite a stressful
> > > environment. The bottom line is that I, for our use cases, see a to
> small
> > > use/effort ratio in this time index.
> > > We do not bootstrap new consumers for key-less logs so frequently and
> > when
> > > we do it, they usually want everything (prod deployment) or just start
> at
> > > the end ( during development).
> > > That caused quite some frustration. Would be better if I could just
> have
> > > turned it off and don't bother any more. Anyhow in the meantime I had
> to
> > > dig deeper into the inner workings
> > > and the impacts are not as dramatic as I initially assumed. But it
> still
> > > carries along some oddities I want to list here.
> > >
> > > first odd thing:
> > > Quote
> > > ---
> > > Enforce time based log rolling
> > >
> > > Currently time based log rolling is based on the creating time of the
> log
> > > segment. With this KIP, the time based rolling would be changed to
> based
> > on
> > > the largest timestamp ever seen in a log segment. A new log segment
> will
> > be
> > > rolled out if current time is greater than largest timestamp ever seen
> in
> > > the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
> > > user should set max.message.time.difference.ms appropriately together
> > > with log.roll.ms to avoid frequent log segment roll out.
> > > ---
> > > imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
> > some
> > > time > log.roll.ms.
> > > From my understanding, when noone else is producing to this partition
> > > except the mirror maker, the broker will start rolling on every append?
> > > Just because you maybe under DOS-attack and your application only works
> > in
> > > the remote location. (also a good occasion for MM to fall behind)
> > > But checking the default values indicates that it should indeed not
> > become
> > > a problem as log.roll.ms defaults to ~>7 days.
> > >
> > >
> > > second odd thing:
> > > Quote
> > > ---
> > > A time index entry (*T*, *offset*) means that in this segment any
> message
> > > whose timestamp is greater than *T* come after *offset.*
> > >
> > > The OffsetRequest behaves almost the same as before. If timestamp *T*
> is
> > > set in the OffsetRequest, the first offset in the returned offset
> > sequence
> > > means that if user want to consume from *T*, that is the offset to
> start
> > > with. The guarantee is that any message whose timestamp is greater
> than T
> > > has a bigger offset. i.e. Any message before this offset has a
> timestamp
> > <
> > > *T*.
> > > ---
> > >
> > > Given how the index is maintained, with a little bit of bad luck
> (rolling
> > > upgrade/config change of mirrormakers for different colocations) one
> ends
> > > with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.
> > maxtimestamp.
> > > If I do not overlook something here, then the fetch code does not seem
> to
> > > take that into account.
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
> > > In this case the Goal listed number 1, not loose any messages, is not
> > > achieved. easy fix seems to be to sort the segsArray by maxtimestamp
> but
> > > can't wrap my head around it just now.
> > >
> > >
> > > third odd thing:
> > > Regarding the worry of increasing complexity. Looking at the code
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 -196
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 & 230
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 -266
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 -307
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 - 410
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 - 435
> > > https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2a
> > > b022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 -108
> > > and especially
> > > https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c40388
> > > 2ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
> > > it feels like the Log & Log segment having a detailed knowledge about
> the
> > > maintained indexes is not the ideal way to model the problem.
> > > Having the Server maintian a Set of Indexes could reduce the code
> > > complexity, while also allowing an easy switch to turn it off. I think
> > both
> > > indexes could point to the physical position, a client would do
> > > fetch(timestamp), and the continue with the offsets as usual. Is there
> > any
> > > specific reason the timestamp index points into the offset index?
> > > For reading one would need to branch earlier, maybe already in
> ApiHandler
> > > and decide what indexes to query, but this branching logic is there now
> > > anyhow.
> > >
> > > Further I also can't think of a situation where one wants to have this
> > > log.message.timestamp.difference.max.ms take effect. I think this
> > defeats
> > > goal 1 again.
> > >
> > > ITE having this index in the brokers now feels wired to me. Gives me a
> > > feeling of complexity that I don't need and have a hard time figuring
> out
> > > how much other people can benefit from it. I hope that this feedback is
> > > useful and helps to understand my scepticism regarding this thing.
> There
> > > were some other oddities that I have a hard time recalling now. So i
> > guess
> > > the index was build for a specific confluent customer, will there be
> any
> > > blogpost about their usecase? or can you share it?
> > >
> > > Best Jan
> > >
> > >
> > > On 24.08.2016 16:47, Jun Rao wrote:
> > >
> > > Jan,
> > >
> > > Thanks for the reply. I actually wasn't sure what your main concern on
> > > time-based rolling is. Just a couple of clarifications. (1) Time-based
> > > rolling doesn't control how long a segment will be retained for. For
> > > retention, if you use time-based, it will now be based on the timestamp
> > in
> > > the message. If you use size-based, it works the same as before. Is
> your
> > > concern on time-based retention? If so, you can always configure the
> > > timestamp in all topics to be log append time, which will give you the
> > same
> > > behavior as before. (2) The creation time of the segment is never
> exposed
> > > to the consumer and therefore is never preserved in MirrorMaker. In
> > > contrast, the timestamp in the message will be preserved in
> MirrorMaker.
> > > So, not sure what your concern on MirrorMaker is.
> > >
> > > Jun
> > >
> > > On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <
> jan.filip...@trivago.com>
> > > wrote:
> > >
> > >> Hi Jun,
> > >>
> > >> I copy pasted this mail from the archive, as I somehow didn't receive
> it
> > >> per mail. I will sill make some comments in line,
> > >> hopefully you can find them quick enough, my apologies.
> > >>
> > >> To make things more clear, you should also know, that all messages in
> > our
> > >> kafka setup have a common way to access their timestamp already (its
> > >> encoded in the value the same way always)
> > >> Sometimes this is a logical time (eg same timestamp accross many
> > >> different topics / partitions), say PHP request start time or the
> like.
> > So
> > >> kafkas internal timestamps are not really attractive
> > >> for us anyways currently.
> > >>
> > >> I hope I can make a point and not waste your time.
> > >>
> > >> Best Jan,
> > >>
> > >> hopefully everything makes sense
> > >>
> > >> --------
> > >>
> > >> Jan,
> > >>
> > >> Currently, there is no switch to disable the time based index.
> > >>
> > >> There are quite a few use cases of time based index.
> > >>
> > >> 1. From KIP-33's wiki, it allows us to do time-based retention
> > accurately.
> > >> Before KIP-33, the time-based retention is based on the last modified
> > time
> > >> of each log segment. The main issue is that last modified time can
> > change
> > >> over time. For example, if a broker loses storage and has to
> > re-replicate
> > >> all data, those re-replicated segments will be retained much longer
> > since
> > >> their last modified time is more recent. Having a time-based index
> > allows
> > >> us to retain segments based on the message time, not the last modified
> > >> time. This can also benefit KIP-71, where we want to combine
> time-based
> > >> retention and compaction.
> > >>
> > >> /If your sparse on discspace, one could try to get by that with
> > >> retention.bytes/
> > >> or, as we did, ssh into the box and rm it, which worked quite good
> when
> > >> no one reads it.
> > >> Chuckles a little when its read but readers usually do an
> > >> auto.offset.reset
> > >> (they are to slow any ways if they reading the last segments hrhr).
> > >>
> > >> 2. In KIP-58, we want to delay log compaction based on a configurable
> > >> amount of time. Time-based index allows us to do this more accurately.
> > >>
> > >> /good point, seems reasonable/
> > >>
> > >> 3. We plan to add an api in the consumer to allow seeking to an offset
> > >> based on a timestamp. The time based index allows us to do this more
> > >> accurately and fast.
> > >>
> > >> /Sure, I personally feel that you rarely want to do this. For Camus,
> we
> > >> used max.pull.historic.days (or simmilliar) successfully quite often.
> we
> > >> just gave it an extra day and got what we wanted
> > >> and for debugging my bisect tool works well enough. So these are the 2
> > >> usecases we expierenced already and found a decent way around it./
> > >>
> > >> Now for the impact.
> > >>
> > >> a. There is a slight change on how time-based rolling works. Before
> > >> KIP-33,
> > >> rolling was based on the time when a segment was loaded in the broker.
> > >> After KIP-33, rolling is based on the time of the first message of a
> > >> segment. Not sure if this is your concern. In the common case, the two
> > >> behave more or less the same. The latter is actually more
> deterministic
> > >> since it's not sensitive to broker restarts.
> > >>
> > >> /This is part of my main concern indeed. This is what scares me and I
> > >> preffered to just opt out, instead of reviewing all our pipelines to
> > check
> > >> whats gonna happen when we put it live.
> > >> For Example the Mirrormakers, If they want to preserve create time
> from
> > >> the source cluster and publish the same create time (wich they should
> > do,
> > >> if you don't encode your own timestamps and want
> > >> to have proper kafka-streams windowing). Then I am quite concerned
> when
> > >> have problems if our cross ocian links and fall behind, say a day or
> > two.
> > >> Then I can think of an very up to date MirrorMaker from
> > >> one colocation and a very laggy Mirrormaker from another colocation.
> For
> > >> me its not 100% clear whats gonna happen. But I can't think of sane
> > >> defaults there. That i love kafka for.
> > >> Just tricky to be convinced that an upgrade is safe, wich was usually
> > >> easy.
> > >> /
> > >> b. Time-based index potentially adds overhead to producing messages
> and
> > >> loading segments. Our experiments show that the impact to producing is
> > >> insignificant. The time to load segments when restarting a broker can
> be
> > >> doubled. However, the absolute time is still reasonable. For example,
> > >> loading 10K log segments with time-based index takes about 5 seconds.
> > >> /
> > >> //Loading should be fine/, totally agree
> > >>
> > >> c Because time-based index is useful in several cases and the impact
> > seems
> > >> small, we didn't consider making time based index optional. Finally,
> > >> although it's possible to make the time based index optional, it will
> > add
> > >> more complexity to the code base. So, we probably should only consider
> > it
> > >> if it's truly needed. Thanks,
> > >>
> > >> /I think one can get away with an easier codebase here. The trick is
> not
> > >> to have the LOG to implement all the logic,
> > >> but just have the broker maintain a Set of Indexes, that gets
> > initialized
> > >> in starup and passed to the LOG. One could ask each individual
> > >> index, if that logsegment should be rolled, compacted, truncated
> > >> whatever.  Once could also give that LogSegment to each index and make
> > it
> > >> rebuild
> > >> the index for example. I didn't figure out the details. But this
> > >> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
> > >> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
> > >> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich
> > >> should already be easier.
> > >> If users don't want time based indexing, just don't put the timebased
> > >> index in the Set then and everything should work like a charm.
> > >> RPC calls that work on the specific indexes would need to throw an
> > >> exception of some kind.
> > >> Just an idea.
> > >> /
> > >> Jun
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On 22.08.2016 09:24, Jan Filipiak wrote:
> > >>
> > >>> Hello everyone,
> > >>>
> > >>> I stumbled across KIP-33 and the time based index, while briefly
> > >>> checking the wiki and commits, I fail to find a way to opt out.
> > >>> I saw it having quite some impact on when logs are rolled and was
> > hoping
> > >>> not to have to deal with all of that. Is there a disable switch I
> > >>> overlooked?
> > >>>
> > >>> Does anybody have a good use case where the timebase index comes in
> > >>> handy? I made a custom console consumer for me,
> > >>> that can bisect a log based on time. Its just a quick probabilistic
> > shot
> > >>> into the log but is sometimes quite useful for some debugging.
> > >>>
> > >>> Best Jan
> > >>>
> > >>
> > >>
> > >
> > >
> >
>

Reply via email to