Hi Dong,

Thanks for proposing this KIP.  I think a metadata epoch is a really good idea.

I read through the DISCUSS thread, but I still don't have a clear picture of 
why the proposal uses a metadata epoch per partition rather than a global 
metadata epoch.  A metadata epoch per partition is kind of unpleasant-- it's at 
least 4 extra bytes per partition that we have to send over the wire in every 
full metadata request, which could become extra kilobytes on the wire when the 
number of partitions becomes large.  Plus, we have to update all the auxillary 
classes to include an epoch.

We need to have a global metadata epoch anyway to handle partition addition and 
deletion.  For example, if I give you MetadataResponse{part1,epoch 1, part2, 
epoch 1} and {part1, epoch1}, which MetadataResponse is newer?  You have no way 
of knowing.  It could be that part2 has just been created, and the response 
with 2 partitions is newer.  Or it coudl be that part2 has just been deleted, 
and therefore the response with 1 partition is newer.  You must have a global 
epoch to disambiguate these two cases.

Previously, I worked on the Ceph distributed filesystem.  Ceph had the concept 
of a map of the whole cluster, maintained by a few servers doing paxos.  This 
map was versioned by a single 64-bit epoch number which increased on every 
change.  It was propagated to clients through gossip.  I wonder if something 
similar could work here?

It seems like the the Kafka MetadataResponse serves two somewhat unrelated 
purposes.  Firstly, it lets clients know what partitions exist in the system 
and where they live.  Secondly, it lets clients know which nodes within the 
partition are in-sync (in the ISR) and which node is the leader.

The first purpose is what you really need a metadata epoch for, I think.  You 
want to know whether a partition exists or not, or you want to know which nodes 
you should talk to in order to write to a given partition.  A single metadata 
epoch for the whole response should be adequate here.  We should not change the 
partition assignment without going through zookeeper (or a similar system), and 
this inherently serializes updates into a numbered stream.  Brokers should also 
stop responding to requests when they are unable to contact ZK for a certain 
time period.  This prevents the case where a given partition has been moved off 
some set of nodes, but a client still ends up talking to those nodes and 
writing data there.

For the second purpose, this is "soft state" anyway.  If the client thinks X is 
the leader but Y is really the leader, the client will talk to X, and X will 
point out its mistake by sending back a NOT_LEADER_FOR_PARTITION.  Then the 
client can update its metadata again and find the new leader, if there is one.  
There is no need for an epoch to handle this.  Similarly, I can't think of a 
reason why changing the in-sync replica set needs to bump the epoch.

best,
Colin


On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> Thanks much for reviewing the KIP!
> 
> Dong
> 
> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> > Yeah that makes sense, again I'm just making sure we understand all the
> > scenarios and what to expect.
> >
> > I agree that if, more generally speaking, say users have only consumed to
> > offset 8, and then call seek(16) to "jump" to a further position, then she
> > needs to be aware that OORE maybe thrown and she needs to handle it or rely
> > on reset policy which should not surprise her.
> >
> >
> > I'm +1 on the KIP.
> >
> > Guozhang
> >
> >
> > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Yes, in general we can not prevent OffsetOutOfRangeException if user
> > seeks
> > > to a wrong offset. The main goal is to prevent OffsetOutOfRangeException
> > if
> > > user has done things in the right way, e.g. user should know that there
> > is
> > > message with this offset.
> > >
> > > For example, if user calls seek(..) right after construction, the only
> > > reason I can think of is that user stores offset externally. In this
> > case,
> > > user currently needs to use the offset which is obtained using
> > position(..)
> > > from the last run. With this KIP, user needs to get the offset and the
> > > offsetEpoch using positionAndOffsetEpoch(...) and stores these
> > information
> > > externally. The next time user starts consumer, he/she needs to call
> > > seek(..., offset, offsetEpoch) right after construction. Then KIP should
> > be
> > > able to ensure that we don't throw OffsetOutOfRangeException if there is
> > no
> > > unclean leader election.
> > >
> > > Does this sound OK?
> > >
> > > Regards,
> > > Dong
> > >
> > >
> > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > "If consumer wants to consume message with offset 16, then consumer
> > must
> > > > have
> > > > already fetched message with offset 15"
> > > >
> > > > --> this may not be always true right? What if consumer just call
> > > seek(16)
> > > > after construction and then poll without committed offset ever stored
> > > > before? Admittedly it is rare but we do not programmably disallow it.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > Thanks much for reviewing the KIP!
> > > > >
> > > > > In the scenario you described, let's assume that broker A has
> > messages
> > > > with
> > > > > offset up to 10, and broker B has messages with offset up to 20. If
> > > > > consumer wants to consume message with offset 9, it will not receive
> > > > > OffsetOutOfRangeException
> > > > > from broker A.
> > > > >
> > > > > If consumer wants to consume message with offset 16, then consumer
> > must
> > > > > have already fetched message with offset 15, which can only come from
> > > > > broker B. Because consumer will fetch from broker B only if
> > leaderEpoch
> > > > >=
> > > > > 2, then the current consumer leaderEpoch can not be 1 since this KIP
> > > > > prevents leaderEpoch rewind. Thus we will not have
> > > > > OffsetOutOfRangeException
> > > > > in this case.
> > > > >
> > > > > Does this address your question, or maybe there is more advanced
> > > scenario
> > > > > that the KIP does not handle?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > > > >
> > > > > > Just a quick question: can we completely eliminate the
> > > > > > OffsetOutOfRangeException with this approach? Say if there is
> > > > consecutive
> > > > > > leader changes such that the cached metadata's partition epoch is
> > 1,
> > > > and
> > > > > > the metadata fetch response returns  with partition epoch 2
> > pointing
> > > to
> > > > > > leader broker A, while the actual up-to-date metadata has partition
> > > > > epoch 3
> > > > > > whose leader is now broker B, the metadata refresh will still
> > succeed
> > > > and
> > > > > > the follow-up fetch request may still see OORE?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to start the voting process for KIP-232:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> > > and+partitionEpoch
> > > > > > >
> > > > > > > The KIP will help fix a concurrency issue in Kafka which
> > currently
> > > > can
> > > > > > > cause message loss or message duplication in consumer.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Dong
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >

Reply via email to