Hey Jun,
I go and try again :), wrote the first one in quite a stressful
environment. The bottom line is that I, for our use cases, see a to
small use/effort ratio in this time index.
We do not bootstrap new consumers for key-less logs so frequently and
when we do it, they usually want everything (prod deployment) or just
start at the end ( during development).
That caused quite some frustration. Would be better if I could just have
turned it off and don't bother any more. Anyhow in the meantime I had to
dig deeper into the inner workings
and the impacts are not as dramatic as I initially assumed. But it still
carries along some oddities I want to list here.
first odd thing:
Quote
---
Enforce time based log rolling
Currently time based log rolling is based on the creating time of the
log segment. With this KIP, the time based rolling would be changed to
based on the largest timestamp ever seen in a log segment. A new log
segment will be rolled out if current time is greater than largest
timestamp ever seen in the log segment + log.roll.ms. When
message.timestamp.type=CreateTime, user should set
max.message.time.difference.ms appropriately together with log.roll.ms
to avoid frequent log segment roll out.
---
imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
some time > log.roll.ms.
From my understanding, when noone else is producing to this partition
except the mirror maker, the broker will start rolling on every append?
Just because you maybe under DOS-attack and your application only works
in the remote location. (also a good occasion for MM to fall behind)
But checking the default values indicates that it should indeed not
become a problem as log.roll.ms defaults to ~>7 days.
second odd thing:
Quote
---
A time index entry (/T/, /offset/) means that in this segment any
message whose timestamp is greater than /T/ come after /offset./
The OffsetRequest behaves almost the same as before. If timestamp *T* is
set in the OffsetRequest, the first offset in the returned offset
sequence means that if user want to consume from *T*, that is the offset
to start with. The guarantee is that any message whose timestamp is
greater than T has a bigger offset. i.e. Any message before this offset
has a timestamp < *T*.
---
Given how the index is maintained, with a little bit of bad luck
(rolling upgrade/config change of mirrormakers for different
colocations) one ends with segmentN.timeindex.maxtimestamp >
segmentN+1.timeindex.maxtimestamp. If I do not overlook something here,
then the fetch code does not seem to take that into account.
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
In this case the Goal listed number 1, not loose any messages, is not
achieved. easy fix seems to be to sort the segsArray by maxtimestamp but
can't wrap my head around it just now.
third odd thing:
Regarding the worry of increasing complexity. Looking at the code
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
-196
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227
& 230
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
-266
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
-307
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408
- 410
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432
- 435
https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
-108
and especially
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
it feels like the Log & Log segment having a detailed knowledge about
the maintained indexes is not the ideal way to model the problem.
Having the Server maintian a Set of Indexes could reduce the code
complexity, while also allowing an easy switch to turn it off. I think
both indexes could point to the physical position, a client would do
fetch(timestamp), and the continue with the offsets as usual. Is there
any specific reason the timestamp index points into the offset index?
For reading one would need to branch earlier, maybe already in
ApiHandler and decide what indexes to query, but this branching logic is
there now anyhow.
Further I also can't think of a situation where one wants to have this
log.message.timestamp.difference.max.ms take effect. I think this
defeats goal 1 again.
ITE having this index in the brokers now feels wired to me. Gives me a
feeling of complexity that I don't need and have a hard time figuring
out how much other people can benefit from it. I hope that this feedback
is useful and helps to understand my scepticism regarding this thing.
There were some other oddities that I have a hard time recalling now. So
i guess the index was build for a specific confluent customer, will
there be any blogpost about their usecase? or can you share it?
Best Jan
On 24.08.2016 16:47, Jun Rao wrote:
Jan,
Thanks for the reply. I actually wasn't sure what your main concern on
time-based rolling is. Just a couple of clarifications. (1) Time-based
rolling doesn't control how long a segment will be retained for. For
retention, if you use time-based, it will now be based on the
timestamp in the message. If you use size-based, it works the same as
before. Is your concern on time-based retention? If so, you can always
configure the timestamp in all topics to be log append time, which
will give you the same behavior as before. (2) The creation time of
the segment is never exposed to the consumer and therefore is never
preserved in MirrorMaker. In contrast, the timestamp in the message
will be preserved in MirrorMaker. So, not sure what your concern on
MirrorMaker is.
Jun
On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak
<jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> wrote:
Hi Jun,
I copy pasted this mail from the archive, as I somehow didn't
receive it per mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.
To make things more clear, you should also know, that all messages
in our kafka setup have a common way to access their timestamp
already (its encoded in the value the same way always)
Sometimes this is a logical time (eg same timestamp accross many
different topics / partitions), say PHP request start time or the
like. So kafkas internal timestamps are not really attractive
for us anyways currently.
I hope I can make a point and not waste your time.
Best Jan,
hopefully everything makes sense
--------
Jan,
Currently, there is no switch to disable the time based index.
There are quite a few use cases of time based index.
1. From KIP-33's wiki, it allows us to do time-based retention
accurately.
Before KIP-33, the time-based retention is based on the last
modified time
of each log segment. The main issue is that last modified time can
change
over time. For example, if a broker loses storage and has to
re-replicate
all data, those re-replicated segments will be retained much
longer since
their last modified time is more recent. Having a time-based index
allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine
time-based
retention and compaction.
/If your sparse on discspace, one could try to get by that with
retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good
when no one reads it.
Chuckles a little when its read but readers usually do an
auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).
2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.
/good point, seems reasonable/
3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.
/Sure, I personally feel that you rarely want to do this. For
Camus, we used max.pull.historic.days (or simmilliar) successfully
quite often. we just gave it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are
the 2 usecases we expierenced already and found a decent way
around it./
Now for the impact.
a. There is a slight change on how time-based rolling works.
Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more
deterministic
since it's not sensitive to broker restarts.
/This is part of my main concern indeed. This is what scares me
and I preffered to just opt out, instead of reviewing all our
pipelines to check whats gonna happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time
from the source cluster and publish the same create time (wich
they should do, if you don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite concerned
when have problems if our cross ocian links and fall behind, say a
day or two. Then I can think of an very up to date MirrorMaker from
one colocation and a very laggy Mirrormaker from another
colocation. For me its not 100% clear whats gonna happen. But I
can't think of sane defaults there. That i love kafka for.
Just tricky to be convinced that an upgrade is safe, wich was
usually easy.
/
b. Time-based index potentially adds overhead to producing
messages and
loading segments. Our experiments show that the impact to producing is
insignificant. The time to load segments when restarting a broker
can be
doubled. However, the absolute time is still reasonable. For example,
loading 10K log segments with time-based index takes about 5 seconds.
/
//Loading should be fine/, totally agree
c Because time-based index is useful in several cases and the
impact seems
small, we didn't consider making time based index optional. Finally,
although it's possible to make the time based index optional, it
will add
more complexity to the code base. So, we probably should only
consider it
if it's truly needed. Thanks,
/I think one can get away with an easier codebase here. The trick
is not to have the LOG to implement all the logic,
but just have the broker maintain a Set of Indexes, that gets
initialized in starup and passed to the LOG. One could ask each
individual
index, if that logsegment should be rolled, compacted, truncated
whatever. Once could also give that LogSegment to each index and
make it rebuild
the index for example. I didn't figure out the details. But this
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
<https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715>
might end up with for(Index i : indexes) [i.shouldRoll(segment)}?
wich should already be easier.
If users don't want time based indexing, just don't put the
timebased index in the Set then and everything should work like a
charm.
RPC calls that work on the specific indexes would need to throw an
exception of some kind.
Just an idea.
/
Jun
On 22.08.2016 09 <tel:22.08.2016%2009>:24, Jan Filipiak wrote:
Hello everyone,
I stumbled across KIP-33 and the time based index, while
briefly checking the wiki and commits, I fail to find a way to
opt out.
I saw it having quite some impact on when logs are rolled and
was hoping not to have to deal with all of that. Is there a
disable switch I overlooked?
Does anybody have a good use case where the timebase index
comes in handy? I made a custom console consumer for me,
that can bisect a log based on time. Its just a quick
probabilistic shot into the log but is sometimes quite useful
for some debugging.
Best Jan