Jay,

Thanks for the comments. Yes, there are actually three proposals as you
pointed out.

We will have a separate proposal for (1) - version control mechanism. We
actually thought about whether we want to separate 2 and 3 internally
before creating the KIP. The reason we put 2 and 3 together is it will
saves us another cross board wire protocol change. Like you said, we have
to migrate all the clients in all languages. To some extent, the effort to
spend on upgrading the clients can be even bigger than implementing the new
feature itself. So there are some attractions if we can do 2 and 3 together
instead of separately. Maybe after (1) is done it will be easier to do
protocol migration. But if we are able to come to an agreement on the
timestamp solution, I would prefer to have it together with relative offset
in the interest of avoiding another wire protocol change (the process to
migrate to relative offset is exactly the same as migrate to message with
timestamp).

In terms of timestamp. I completely agree that having client timestamp is
more useful if we can make sure the timestamp is good. But in reality that
can be a really big *IF*. I think the problem is exactly as Ewen mentioned,
if we let the client to set the timestamp, it would be very hard for the
broker to utilize it. If broker apply retention policy based on the client
timestamp. One misbehave producer can potentially completely mess up the
retention policy on the broker. Although people don't care about server
side timestamp. People do care a lot when timestamp breaks. Searching by
timestamp is a really important use case even though it is not used as
often as searching by offset. It has significant direct impact on RTO when
there is a cross cluster failover as Todd mentioned.

The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
guarantee monotonic increase of the timestamp. Many commercial system
actually do something similar to this to solve the time skew. About
changing the time, I am not sure if people use NTP like using a watch to
just set it forward/backward by an hour or so. The time adjustment I used
to do is typically to adjust something like a minute  / week. So for each
second, there might be a few mircoseconds slower/faster but should not
break the clock completely to make sure all the time-based transactions are
not affected. The one minute change will be done within a week but not
instantly.

Personally, I think having client side timestamp will be useful if we don't
need to put the broker and data integrity under risk. If we have to choose
from one of them but not both. I would prefer server side timestamp because
for client side timestamp there is always a plan B which is putting the
timestamp into payload.

Another reason I am reluctant to use the client side timestamp is that it
is always dangerous to mix the control plane with data plane. IP did this
and it has caused so many different breaches so people are migrating to
something like MPLS. An example in Kafka is that any client can construct a
LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest (you
name it) and send it to the broker to mess up the entire cluster, also as
we already noticed a busy cluster can respond quite slow to controller
messages. So it would really be nice if we can avoid giving the power to
clients to control the log retention.

Thanks,

Jiangjie (Becket) Qin


On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com> wrote:

> So, with regards to why you want to search by timestamp, the biggest
> problem I've seen is with consumers who want to reset their timestamps to a
> specific point, whether it is to replay a certain amount of messages, or to
> rewind to before some problem state existed. This happens more often than
> anyone would like.
>
> To handle this now we need to constantly export the broker's offset for
> every partition to a time-series database and then use external processes
> to query this. I know we're not the only ones doing this. The way the
> broker handles requests for offsets by timestamp is a little obtuse
> (explain it to anyone without intimate knowledge of the internal workings
> of the broker - every time I do I see this). In addition, as Becket pointed
> out, it causes problems specifically with retention of messages by time
> when you move partitions around.
>
> I'm deliberately avoiding the discussion of what timestamp to use. I can
> see the argument either way, though I tend to lean towards the idea that
> the broker timestamp is the only viable source of truth in this situation.
>
> -Todd
>
>
> On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > >
> > > 2. Nobody cares what time it is on the server.
> > >
> >
> > This is a good way of summarizing the issue I was trying to get at, from
> an
> > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention)
> is
> > reasonably handled by a server-side timestamp. I really just care that a
> > message is there long enough that I have a chance to process it. #3
> > (searching by timestamp) only seems useful if we can guarantee the
> > server-side timestamp is close enough to the original client-side
> > timestamp, and any mirror maker step seems to break that (even ignoring
> any
> > issues with broker availability).
> >
> > I'm also wondering whether optimizing for search-by-timestamp on the
> broker
> > is really something we want to do given that messages aren't really
> > guaranteed to be ordered by application-level timestamps on the broker.
> Is
> > part of the need for this just due to the current consumer APIs being
> > difficult to work with? For example, could you implement this pretty
> easily
> > client side just the way you would broker-side? I'd imagine a couple of
> > random seeks + reads during very rare occasions (i.e. when the app starts
> > up) wouldn't be a problem performance-wise. Or is it also that you need
> the
> > broker to enforce things like monotonically increasing timestamps since
> you
> > can't do the query properly and efficiently without that guarantee, and
> > therefore what applications are actually looking for *is* broker-side
> > timestamps?
> >
> > -Ewen
> >
> >
> >
> > > Consider cases where data is being copied from a database or from log
> > > files. In steady-state the server time is very close to the client time
> > if
> > > their clocks are sync'd (see 1) but there will be times of large
> > divergence
> > > when the copying process is stopped or falls behind. When this occurs
> it
> > is
> > > clear that the time the data arrived on the server is irrelevant, it is
> > the
> > > source timestamp that matters. This is the problem you are trying to
> fix
> > by
> > > retaining the mm timestamp but really the client should always set the
> > time
> > > with the use of server-side time as a fallback. It would be worth
> talking
> > > to the Samza folks and reading through this blog post (
> > >
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > > )
> > > on this subject since we went through similar learnings on the stream
> > > processing side.
> > >
> > > I think the implication of these two is that we need a proposal that
> > > handles potentially very out-of-order timestamps in some kind of sanish
> > way
> > > (buggy clients will set something totally wrong as the time).
> > >
> > > -Jay
> > >
> > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > The magic byte is used to version message format so we'll need to
> make
> > > > sure that check is in place--I actually don't see it in the current
> > > > consumer code which I think is a bug we should fix for the next
> release
> > > > (filed KAFKA-2523). The purpose of that field is so there is a clear
> > > check
> > > > on the format rather than the scrambled scenarios Becket describes.
> > > >
> > > > Also, Becket, I don't think just fixing the java client is sufficient
> > as
> > > > that would break other clients--i.e. if anyone writes a v1 messages,
> > even
> > > > by accident, any non-v1-capable consumer will break. I think we
> > probably
> > > > need a way to have the server ensure a particular message format
> either
> > > at
> > > > read or write time.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > > wrote:
> > > >
> > > >> Hi Guozhang,
> > > >>
> > > >> I checked the code again. Actually CRC check probably won't fail.
> The
> > > >> newly
> > > >> added timestamp field might be treated as keyLength instead, so we
> are
> > > >> likely to receive an IllegalArgumentException when try to read the
> > key.
> > > >> I'll update the KIP.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com>
> > > wrote:
> > > >>
> > > >> > Hi, Guozhang,
> > > >> >
> > > >> > Thanks for reading the KIP. By "old consumer", I meant the
> > > >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug
> > fixed.
> > > >> If we
> > > >> > fix the ZookeeperConsumerConnector then it will throw exception
> > > >> complaining
> > > >> > about the unsupported version when it sees message format V1.
> What I
> > > was
> > > >> > trying to say is that if we have some ZookeeperConsumerConnector
> > > running
> > > >> > without the fix, the consumer will complain about CRC mismatch
> > instead
> > > >> of
> > > >> > unsupported version.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Jiangjie (Becket) Qin
> > > >> >
> > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> >> Thanks for the write-up Jiangjie.
> > > >> >>
> > > >> >> One comment about migration plan: "For old consumers, if they see
> > the
> > > >> new
> > > >> >> protocol the CRC check will fail"..
> > > >> >>
> > > >> >> Do you mean this bug in the old consumer cannot be fixed in a
> > > >> >> backward-compatible way?
> > > >> >>
> > > >> >> Guozhang
> > > >> >>
> > > >> >>
> > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
> > > <j...@linkedin.com.invalid
> > > >> >
> > > >> >> wrote:
> > > >> >>
> > > >> >> > Hi,
> > > >> >> >
> > > >> >> > We just created KIP-31 to propose a message format change in
> > Kafka.
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> > > >> >> >
> > > >> >> > As a summary, the motivations are:
> > > >> >> > 1. Avoid server side message re-compression
> > > >> >> > 2. Honor time-based log roll and retention
> > > >> >> > 3. Enable offset search by timestamp at a finer granularity.
> > > >> >> >
> > > >> >> > Feedback and comments are welcome!
> > > >> >> >
> > > >> >> > Thanks,
> > > >> >> >
> > > >> >> > Jiangjie (Becket) Qin
> > > >> >> >
> > > >> >>
> > > >> >>
> > > >> >>
> > > >> >> --
> > > >> >> -- Guozhang
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>

Reply via email to