Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:
Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just complexity
of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother
that much.

Best Jan





On 29.08.2016 19:36, Jun Rao wrote:

Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively,
being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if
you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it
seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
have
given my view on this above. Are there any other things that you think
that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Jun,
thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where **** can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along
in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again
even
with the 7 days. Thanks for pointing out! Id like to see the appendTime
as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want
to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his
downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then on, he still has NO clue
whatsoever if messages that come later now with an earlier timestamp need
to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to
determine this in aggregated downstream windowed stores). So the user is
in
.... even though he can seek, he
can't rule out his error. IMO it helps them to build the wrong thing,
that
will just be operational pain *somewhere*

Look at the error recovery without timestamp seek:
start your application from beginning with a different output
(version,key,partition) wait for it to fully catch up. drop the
timewindows
the error happend + confidence interval (if your data isnt there anymore,
no seek will help) in from the old version. Stop the stream processor,
merge the data it created, switch back to the original
(version,key,partition) and start the SP again.
Done. As bigger you choose the confidence interval, the more correct, the
less the index helps. usually you want maximum confidence => no index
usage, get everything that is still there. (Maybe even redump from hadoop
in extreme cases) ironically causing the log to roll all the time (as you
probably publish to a new topic and have the streams application use
both)
:(

As you can see, even though the users can seek, if they want to create
proper numbers, Billing information eg. They are in trouble, and giving
them this index will just make them implement the wrong solution! It
boils
down to: this index is not the kafka way of doing things. The index can
help the second approach but usually one chooses the confidence interval
=
as much as one can get.

Then the last thing. "OffsetRequest is a legacy request. It's awkward to
use and we plan to deprecate it over time". You got to be kidding me. It
was wired to get the byteposition back then, but getting the offsets is
perfectly reasonable and one of the best things in the world. want to
know
how your stream looked at a specific point in time? get start and end
offset, fetch whenever you like, you get an perfect snapshot in wall
time.
this is usefull for compacted topis aswell as streaming topics. Offsets
are
a well known thing in kafka and in no way awkward as its monotonically
increasing property is just great.

For seeking the log based on a confidence interval (the only chance you
get in non-key logs reprocessing) one can also bisect the log from the
client. As the case is rare it is intensive and causes at least a few
hundreds seeks for bigger topics. but I guess the broker does these extra
for the new index file now.

This index, I feel is just not following the whole "kafka-way". Can you
suggest on the proposed re-factoring? what are the chance to get it
upstream if I could pull it off? (unlikely)

Thanks for all the effort you put in into listening to my concerns.
highly
appreciated!

Best Jan




On 25.08.2016 23:36, Jun Rao wrote:

Jan,

Thanks a lot for the feedback. Now I understood your concern better. The
following are my comments.

The first odd thing that you pointed out could be a real concern.
Basically, if a producer publishes messages with really old timestamp,
our
default log.roll.hours (7 days) will indeed cause the broker to roll a
log
on ever message, which would be bad. Time-based rolling is actually used
infrequently. The only use case that I am aware of is that for compacted
topics, rolling logs based on time could allow the compaction to happen
sooner (since the active segment is never cleaned). One option is to
change
the default log.roll.hours to infinite and also document the impact on
changing log.roll.hours. Jiangjie, what do you think?

For the second odd thing, the OffsetRequest is a legacy request. It's
awkward to use and we plan to deprecate it over time. That's why we
haven't
change the logic in serving OffsetRequest after KIP-33. The plan is to
introduce a new OffsetRequest that will be exploiting the time based
index.
It's possible to have log segments with non-increasing largest timestamp.
As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate the
segments in offset order and stop when we see the target timestamp.

For the third odd thing, one of the original reasons why the time-based
index points to an offset instead of the file position is that it makes
truncating the time index to an offset easier since the offset is in the
index. Looking at the code, we could also store the file position in the
time index and do truncation based on position, instead of offset. It
probably has a slight advantage of consistency between the two indexes
and
avoiding another level of indirection when looking up the time index.
Jiangjie, have we ever considered that?

The idea of log.message.timestamp.difference.max.ms is to prevent the
timestamp in the published messages to drift too far away from the
current
timestamp. The default value is infinite though.

Lastly, for the usefulness of time-based index, it's actually a feature
that the community wanted and voted for, not just for Confluent
customers.
For example, being able to seek to an offset based on timestamp has been
a
frequently asked feature. This can be useful for at least the following
scenarios: (1) If there is a bug in a consumer application, the user will
want to rewind the consumption after fixing the logic. In this case, it's
more convenient to rewind the consumption based on a timestamp. (2) In a
multi data center setup, it's common for people to mirror the data from
one
Kafka cluster in one data center to another cluster in a different data
center. If one data center fails, people want to be able to resume the
consumption in the other data center. Since the offsets are not
preserving
between the two clusters through mirroring, being able to find a starting
offset based on timestamp will allow the consumer to resume the
consumption
without missing any messages and also not replaying too many messages.

Thanks,

Jun


On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hey Jun,
I go and try again :), wrote the first one in quite a stressful
environment. The bottom line is that I, for our use cases, see a to
small
use/effort ratio in this time index.
We do not bootstrap new consumers for key-less logs so frequently and
when we do it, they usually want everything (prod deployment) or just
start
at the end ( during development).
That caused quite some frustration. Would be better if I could just have
turned it off and don't bother any more. Anyhow in the meantime I had to
dig deeper into the inner workings
and the impacts are not as dramatic as I initially assumed. But it still
carries along some oddities I want to list here.

first odd thing:
Quote
---
Enforce time based log rolling

Currently time based log rolling is based on the creating time of the
log
segment. With this KIP, the time based rolling would be changed to
based on
the largest timestamp ever seen in a log segment. A new log segment
will be
rolled out if current time is greater than largest timestamp ever seen
in
the log segment + log.roll.ms. When message.timestamp.type=CreateTime,
user should set max.message.time.difference.ms appropriately together
with log.roll.ms to avoid frequent log segment roll out.
---
imagine a Mirrormaker falls behind and the Mirrormaker has a delay of
some time > log.roll.ms.
  From my understanding, when noone else is producing to this partition
except the mirror maker, the broker will start rolling on every append?
Just because you maybe under DOS-attack and your application only works
in the remote location. (also a good occasion for MM to fall behind)
But checking the default values indicates that it should indeed not
become a problem as log.roll.ms defaults to ~>7 days.


second odd thing:
Quote
---
A time index entry (*T*, *offset*) means that in this segment any
message whose timestamp is greater than *T* come after *offset.*

The OffsetRequest behaves almost the same as before. If timestamp *T* is
set in the OffsetRequest, the first offset in the returned offset
sequence
means that if user want to consume from *T*, that is the offset to start
with. The guarantee is that any message whose timestamp is greater than
T
has a bigger offset. i.e. Any message before this offset has a
timestamp <
*T*.
---

Given how the index is maintained, with a little bit of bad luck
(rolling
upgrade/config change of mirrormakers for different colocations) one
ends
with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex.maxtimest
amp.
If I do not overlook something here, then the fetch code does not seem
to
take that into account.
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604
In this case the Goal listed number 1, not loose any messages, is not
achieved. easy fix seems to be to sort the segsArray by maxtimestamp but
can't wrap my head around it just now.


third odd thing:
Regarding the worry of increasing complexity. Looking at the code
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193
-196
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 &
230
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265
-266
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305
-307
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 -
410
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 -
435
https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3
f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104
-108
and especially
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
it feels like the Log & Log segment having a detailed knowledge about
the
maintained indexes is not the ideal way to model the problem.
Having the Server maintian a Set of Indexes could reduce the code
complexity, while also allowing an easy switch to turn it off. I think
both
indexes could point to the physical position, a client would do
fetch(timestamp), and the continue with the offsets as usual. Is there
any
specific reason the timestamp index points into the offset index?
For reading one would need to branch earlier, maybe already in
ApiHandler
and decide what indexes to query, but this branching logic is there now
anyhow.

Further I also can't think of a situation where one wants to have this
log.message.timestamp.difference.max.ms take effect. I think this
defeats goal 1 again.

ITE having this index in the brokers now feels wired to me. Gives me a
feeling of complexity that I don't need and have a hard time figuring
out
how much other people can benefit from it. I hope that this feedback is
useful and helps to understand my scepticism regarding this thing. There
were some other oddities that I have a hard time recalling now. So i
guess
the index was build for a specific confluent customer, will there be any
blogpost about their usecase? or can you share it?

Best Jan


On 24.08.2016 16:47, Jun Rao wrote:

Jan,

Thanks for the reply. I actually wasn't sure what your main concern on
time-based rolling is. Just a couple of clarifications. (1) Time-based
rolling doesn't control how long a segment will be retained for. For
retention, if you use time-based, it will now be based on the timestamp
in
the message. If you use size-based, it works the same as before. Is your
concern on time-based retention? If so, you can always configure the
timestamp in all topics to be log append time, which will give you the
same
behavior as before. (2) The creation time of the segment is never
exposed
to the consumer and therefore is never preserved in MirrorMaker. In
contrast, the timestamp in the message will be preserved in MirrorMaker.
So, not sure what your concern on MirrorMaker is.

Jun

On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak <jan.filip...@trivago.com
wrote:

Hi Jun,
I copy pasted this mail from the archive, as I somehow didn't receive
it
per mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.

To make things more clear, you should also know, that all messages in
our kafka setup have a common way to access their timestamp already
(its
encoded in the value the same way always)
Sometimes this is a logical time (eg same timestamp accross many
different topics / partitions), say PHP request start time or the
like. So
kafkas internal timestamps are not really attractive
for us anyways currently.

I hope I can make a point and not waste your time.

Best Jan,

hopefully everything makes sense

--------

Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention
accurately.
Before KIP-33, the time-based retention is based on the last modified
time
of each log segment. The main issue is that last modified time can
change
over time. For example, if a broker loses storage and has to
re-replicate
all data, those re-replicated segments will be retained much longer
since
their last modified time is more recent. Having a time-based index
allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine time-based
retention and compaction.

/If your sparse on discspace, one could try to get by that with
retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good when
no one reads it.
Chuckles a little when its read but readers usually do an
auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

/good point, seems reasonable/

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

/Sure, I personally feel that you rarely want to do this. For Camus, we
used max.pull.historic.days (or simmilliar) successfully quite often.
we
just gave it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are the 2
usecases we expierenced already and found a decent way around it./

Now for the impact.

a. There is a slight change on how time-based rolling works. Before
KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more deterministic
since it's not sensitive to broker restarts.

/This is part of my main concern indeed. This is what scares me and I
preffered to just opt out, instead of reviewing all our pipelines to
check
whats gonna happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time from
the source cluster and publish the same create time (wich they should
do,
if you don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite concerned when
have problems if our cross ocian links and fall behind, say a day or
two.
Then I can think of an very up to date MirrorMaker from
one colocation and a very laggy Mirrormaker from another colocation.
For
me its not 100% clear whats gonna happen. But I can't think of sane
defaults there. That i love kafka for.
Just tricky to be convinced that an upgrade is safe, wich was usually
easy.
/
b. Time-based index potentially adds overhead to producing messages and
loading segments. Our experiments show that the impact to producing is
insignificant. The time to load segments when restarting a broker can
be
doubled. However, the absolute time is still reasonable. For example,
loading 10K log segments with time-based index takes about 5 seconds.
/
//Loading should be fine/, totally agree

c Because time-based index is useful in several cases and the impact
seems
small, we didn't consider making time based index optional. Finally,
although it's possible to make the time based index optional, it will
add
more complexity to the code base. So, we probably should only consider
it
if it's truly needed. Thanks,

/I think one can get away with an easier codebase here. The trick is
not
to have the LOG to implement all the logic,
but just have the broker maintain a Set of Indexes, that gets
initialized in starup and passed to the LOG. One could ask each
individual
index, if that logsegment should be rolled, compacted, truncated
whatever.  Once could also give that LogSegment to each index and make
it
rebuild
the index for example. I didn't figure out the details. But this
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298
8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich
should already be easier.
If users don't want time based indexing, just don't put the timebased
index in the Set then and everything should work like a charm.
RPC calls that work on the specific indexes would need to throw an
exception of some kind.
Just an idea.
/
Jun





On 22.08.2016 09 <22.08.2016%2009>:24, Jan Filipiak wrote:

Hello everyone,
I stumbled across KIP-33 and the time based index, while briefly
checking the wiki and commits, I fail to find a way to opt out.
I saw it having quite some impact on when logs are rolled and was
hoping not to have to deal with all of that. Is there a disable
switch I
overlooked?

Does anybody have a good use case where the timebase index comes in
handy? I made a custom console consumer for me,
that can bisect a log based on time. Its just a quick probabilistic
shot into the log but is sometimes quite useful for some debugging.

Best Jan



Reply via email to