On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> Hey Colin,
> 
> Thanks for the comment.
> 
> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <cmcc...@apache.org> wrote:
> 
> > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> > > Hey Colin,
> > >
> > > Thanks for reviewing the KIP.
> > >
> > > If I understand you right, you maybe suggesting that we can use a global
> > > metadataEpoch that is incremented every time controller updates metadata.
> > > The problem with this solution is that, if a topic is deleted and created
> > > again, user will not know whether that the offset which is stored before
> > > the topic deletion is no longer valid. This motivates the idea to include
> > > per-partition partitionEpoch. Does this sound reasonable?
> >
> > Hi Dong,
> >
> > Perhaps we can store the last valid offset of each deleted topic in
> > ZooKeeper.  Then, when a topic with one of those names gets re-created, we
> > can start the topic at the previous end offset rather than at 0.  This
> > preserves immutability.  It is no more burdensome than having to preserve a
> > "last epoch" for the deleted partition somewhere, right?
> >
> 
> My concern with this solution is that the number of zookeeper nodes get
> more and more over time if some users keep deleting and creating topics. Do
> you think this can be a problem?

Hi Dong,

We could expire the "partition tombstones" after an hour or so.  In practice 
this would solve the issue for clients that like to destroy and re-create 
topics all the time.  In any case, doesn't the current proposal add 
per-partition znodes as well that we have to track even after the partition is 
deleted?  Or did I misunderstand that?  

It's not really clear to me what should happen when a topic is destroyed and 
re-created with new data.  Should consumers continue to be able to consume?  We 
don't know where they stopped consuming from the previous incarnation of the 
topic, so messages may have been lost.  Certainly consuming data from offset X 
of the new incarnation of the topic may give something totally different from 
what you would have gotten from offset X of the previous incarnation of the 
topic.

By choosing to reuse the same (topic, partition, offset) 3-tuple, we have 
chosen to give up immutability.  That was a really bad decision.  And now we 
have to worry about time dependencies, stale cached data, and all the rest.  We 
can't completely fix this inside Kafka no matter what we do, because not all 
that cached data is inside Kafka itself.  Some of it may be in systems that 
Kafka has sent data to, such as other daemons, SQL databases, streams, and so 
forth.

I guess the idea here is that mirror maker should work as expected when users 
destroy a topic and re-create it with the same name.  That's kind of tough, 
though, since in that scenario, mirror maker probably should destroy and 
re-create the topic on the other end, too, right?  Otherwise, what you end up 
with on the other end could be half of one incarnation of the topic, and half 
of another.

What mirror maker really needs is to be able to follow a stream of events about 
the kafka cluster itself.  We could have some master topic which is always 
present and which contains data about all topic deletions, creations, etc.  
Then MM can simply follow this topic and do what is needed.

> 
> 
> >
> > >
> > > Then the next question maybe, should we use a global metadataEpoch +
> > > per-partition partitionEpoch, instead of using per-partition leaderEpoch
> > +
> > > per-partition leaderEpoch. The former solution using metadataEpoch would
> > > not work due to the following scenario (provided by Jun):
> > >
> > > "Consider the following scenario. In metadata v1, the leader for a
> > > partition is at broker 1. In metadata v2, leader is at broker 2. In
> > > metadata v3, leader is at broker 1 again. The last committed offset in
> > v1,
> > > v2 and v3 are 10, 20 and 30, respectively. A consumer is started and
> > reads
> > > metadata v1 and reads messages from offset 0 to 25 from broker 1. My
> > > understanding is that in the current proposal, the metadata version
> > > associated with offset 25 is v1. The consumer is then restarted and
> > fetches
> > > metadata v2. The consumer tries to read from broker 2, which is the old
> > > leader with the last offset at 20. In this case, the consumer will still
> > > get OffsetOutOfRangeException incorrectly."
> > >
> > > Regarding your comment "For the second purpose, this is "soft state"
> > > anyway.  If the client thinks X is the leader but Y is really the leader,
> > > the client will talk to X, and X will point out its mistake by sending
> > back
> > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem here is
> > > that the old leader X may still think it is the leader of the partition
> > and
> > > thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is
> > provided
> > > in KAFKA-6262. Can you check if that makes sense?
> >
> > This is solvable with a timeout, right?  If the leader can't communicate
> > with the controller for a certain period of time, it should stop acting as
> > the leader.  We have to solve this problem, anyway, in order to fix all the
> > corner cases.
> >
> 
> Not sure if I fully understand your proposal. The proposal seems to require
> non-trivial changes to our existing leadership election mechanism. Could
> you provide more detail regarding how it works? For example, how should
> user choose this timeout, how leader determines whether it can still
> communicate with controller, and how this triggers controller to elect new
> leader?

Before I come up with any proposal, let me make sure I understand the problem 
correctly.  My big question was, what prevents split-brain here?

Let's say I have a partition which is on nodes A, B, and C, with min-ISR 2.  
The controller is D.  At some point, there is a network partition between A and 
B and the rest of the cluster.  The Controller re-assigns the partition to 
nodes C, D, and E.  But A and B keep chugging away, even though they can no 
longer communicate with the controller.

At some point, a client with stale metadata writes to the partition.  It still 
thinks the partition is on node A, B, and C, so that's where it sends the data. 
 It's unable to talk to C, but A and B reply back that all is well.

Is this not a case where we could lose data due to split brain?  Or is there a 
mechanism for preventing this that I missed?  If it is, it seems like a pretty 
serious failure case that we should be handling with our metadata rework.  And 
I think epoch numbers and timeouts might be part of the solution.

best,
Colin


> 
> 
> > best,
> > Colin
> >
> > >
> > > Regards,
> > > Dong
> > >
> > >
> > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for proposing this KIP.  I think a metadata epoch is a really
> > good
> > > > idea.
> > > >
> > > > I read through the DISCUSS thread, but I still don't have a clear
> > picture
> > > > of why the proposal uses a metadata epoch per partition rather than a
> > > > global metadata epoch.  A metadata epoch per partition is kind of
> > > > unpleasant-- it's at least 4 extra bytes per partition that we have to
> > send
> > > > over the wire in every full metadata request, which could become extra
> > > > kilobytes on the wire when the number of partitions becomes large.
> > Plus,
> > > > we have to update all the auxillary classes to include an epoch.
> > > >
> > > > We need to have a global metadata epoch anyway to handle partition
> > > > addition and deletion.  For example, if I give you
> > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, epoch1},
> > which
> > > > MetadataResponse is newer?  You have no way of knowing.  It could be
> > that
> > > > part2 has just been created, and the response with 2 partitions is
> > newer.
> > > > Or it coudl be that part2 has just been deleted, and therefore the
> > response
> > > > with 1 partition is newer.  You must have a global epoch to
> > disambiguate
> > > > these two cases.
> > > >
> > > > Previously, I worked on the Ceph distributed filesystem.  Ceph had the
> > > > concept of a map of the whole cluster, maintained by a few servers
> > doing
> > > > paxos.  This map was versioned by a single 64-bit epoch number which
> > > > increased on every change.  It was propagated to clients through
> > gossip.  I
> > > > wonder if something similar could work here?
> > > >
> > > > It seems like the the Kafka MetadataResponse serves two somewhat
> > unrelated
> > > > purposes.  Firstly, it lets clients know what partitions exist in the
> > > > system and where they live.  Secondly, it lets clients know which nodes
> > > > within the partition are in-sync (in the ISR) and which node is the
> > leader.
> > > >
> > > > The first purpose is what you really need a metadata epoch for, I
> > think.
> > > > You want to know whether a partition exists or not, or you want to know
> > > > which nodes you should talk to in order to write to a given
> > partition.  A
> > > > single metadata epoch for the whole response should be adequate here.
> > We
> > > > should not change the partition assignment without going through
> > zookeeper
> > > > (or a similar system), and this inherently serializes updates into a
> > > > numbered stream.  Brokers should also stop responding to requests when
> > they
> > > > are unable to contact ZK for a certain time period.  This prevents the
> > case
> > > > where a given partition has been moved off some set of nodes, but a
> > client
> > > > still ends up talking to those nodes and writing data there.
> > > >
> > > > For the second purpose, this is "soft state" anyway.  If the client
> > thinks
> > > > X is the leader but Y is really the leader, the client will talk to X,
> > and
> > > > X will point out its mistake by sending back a
> > NOT_LEADER_FOR_PARTITION.
> > > > Then the client can update its metadata again and find the new leader,
> > if
> > > > there is one.  There is no need for an epoch to handle this.
> > Similarly, I
> > > > can't think of a reason why changing the in-sync replica set needs to
> > bump
> > > > the epoch.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> > > > > Thanks much for reviewing the KIP!
> > > > >
> > > > > Dong
> > > > >
> > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Yeah that makes sense, again I'm just making sure we understand
> > all the
> > > > > > scenarios and what to expect.
> > > > > >
> > > > > > I agree that if, more generally speaking, say users have only
> > consumed
> > > > to
> > > > > > offset 8, and then call seek(16) to "jump" to a further position,
> > then
> > > > she
> > > > > > needs to be aware that OORE maybe thrown and she needs to handle
> > it or
> > > > rely
> > > > > > on reset policy which should not surprise her.
> > > > > >
> > > > > >
> > > > > > I'm +1 on the KIP.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Yes, in general we can not prevent OffsetOutOfRangeException if
> > user
> > > > > > seeks
> > > > > > > to a wrong offset. The main goal is to prevent
> > > > OffsetOutOfRangeException
> > > > > > if
> > > > > > > user has done things in the right way, e.g. user should know that
> > > > there
> > > > > > is
> > > > > > > message with this offset.
> > > > > > >
> > > > > > > For example, if user calls seek(..) right after construction, the
> > > > only
> > > > > > > reason I can think of is that user stores offset externally. In
> > this
> > > > > > case,
> > > > > > > user currently needs to use the offset which is obtained using
> > > > > > position(..)
> > > > > > > from the last run. With this KIP, user needs to get the offset
> > and
> > > > the
> > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores these
> > > > > > information
> > > > > > > externally. The next time user starts consumer, he/she needs to
> > call
> > > > > > > seek(..., offset, offsetEpoch) right after construction. Then KIP
> > > > should
> > > > > > be
> > > > > > > able to ensure that we don't throw OffsetOutOfRangeException if
> > > > there is
> > > > > > no
> > > > > > > unclean leader election.
> > > > > > >
> > > > > > > Does this sound OK?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > "If consumer wants to consume message with offset 16, then
> > consumer
> > > > > > must
> > > > > > > > have
> > > > > > > > already fetched message with offset 15"
> > > > > > > >
> > > > > > > > --> this may not be always true right? What if consumer just
> > call
> > > > > > > seek(16)
> > > > > > > > after construction and then poll without committed offset ever
> > > > stored
> > > > > > > > before? Admittedly it is rare but we do not programmably
> > disallow
> > > > it.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <
> > lindon...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Guozhang,
> > > > > > > > >
> > > > > > > > > Thanks much for reviewing the KIP!
> > > > > > > > >
> > > > > > > > > In the scenario you described, let's assume that broker A has
> > > > > > messages
> > > > > > > > with
> > > > > > > > > offset up to 10, and broker B has messages with offset up to
> > 20.
> > > > If
> > > > > > > > > consumer wants to consume message with offset 9, it will not
> > > > receive
> > > > > > > > > OffsetOutOfRangeException
> > > > > > > > > from broker A.
> > > > > > > > >
> > > > > > > > > If consumer wants to consume message with offset 16, then
> > > > consumer
> > > > > > must
> > > > > > > > > have already fetched message with offset 15, which can only
> > come
> > > > from
> > > > > > > > > broker B. Because consumer will fetch from broker B only if
> > > > > > leaderEpoch
> > > > > > > > >=
> > > > > > > > > 2, then the current consumer leaderEpoch can not be 1 since
> > this
> > > > KIP
> > > > > > > > > prevents leaderEpoch rewind. Thus we will not have
> > > > > > > > > OffsetOutOfRangeException
> > > > > > > > > in this case.
> > > > > > > > >
> > > > > > > > > Does this address your question, or maybe there is more
> > advanced
> > > > > > > scenario
> > > > > > > > > that the KIP does not handle?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang <
> > > > wangg...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > > > > > > > >
> > > > > > > > > > Just a quick question: can we completely eliminate the
> > > > > > > > > > OffsetOutOfRangeException with this approach? Say if there
> > is
> > > > > > > > consecutive
> > > > > > > > > > leader changes such that the cached metadata's partition
> > epoch
> > > > is
> > > > > > 1,
> > > > > > > > and
> > > > > > > > > > the metadata fetch response returns  with partition epoch 2
> > > > > > pointing
> > > > > > > to
> > > > > > > > > > leader broker A, while the actual up-to-date metadata has
> > > > partition
> > > > > > > > > epoch 3
> > > > > > > > > > whose leader is now broker B, the metadata refresh will
> > still
> > > > > > succeed
> > > > > > > > and
> > > > > > > > > > the follow-up fetch request may still see OORE?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <
> > lindon...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start the voting process for KIP-232:
> > > > > > > > > > >
> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> > > > > > > and+partitionEpoch
> > > > > > > > > > >
> > > > > > > > > > > The KIP will help fix a concurrency issue in Kafka which
> > > > > > currently
> > > > > > > > can
> > > > > > > > > > > cause message loss or message duplication in consumer.
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Dong
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > >
> >

Reply via email to