Hi Becket,

I would argue that using IDs for partitions is not a performance improvement, 
but actually a completely different way of accomplishing what this KIP is 
trying to solve.  If you give partitions globally unique IDs, and use a 
different ID when re-creating a topic partition, you don't need a partition 
epoch to distinguish between the first incarnation of a topic.  I think there 
are some advantages and disadvantages to both approaches.

One thing that isn't clear to me is whether a 32-bit number would be enough for 
a partition epoch.  If I understand correctly, the partition epoch for a newly 
created partition is taken from a global counter maintained in ZK.  So I would 
expect a 32 or 31 bit counter to potentially wrap around if there are a lot of 
partition creations and deletions.

best,
Colin


On Mon, Feb 5, 2018, at 14:43, Becket Qin wrote:
> +1 on the KIP.
> 
> I think the KIP is mainly about adding the capability of tracking the
> system state change lineage. It does not seem necessary to bundle this KIP
> with replacing the topic partition with partition epoch in produce/fetch.
> Replacing topic-partition string with partition epoch is essentially a
> performance improvement on top of this KIP. That can probably be done
> separately.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com> wrote:
> 
> > Hey Colin,
> >
> > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <cmcc...@apache.org> wrote:
> >
> > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Colin,
> > > > >
> > > > > I understand that the KIP will adds overhead by introducing
> > > per-partition
> > > > > partitionEpoch. I am open to alternative solutions that does not
> > incur
> > > > > additional overhead. But I don't see a better way now.
> > > > >
> > > > > IMO the overhead in the FetchResponse may not be that much. We
> > probably
> > > > > should discuss the percentage increase rather than the absolute
> > number
> > > > > increase. Currently after KIP-227, per-partition header has 23 bytes.
> > > This
> > > > > KIP adds another 4 bytes. Assume the records size is 10KB, the
> > > percentage
> > > > > increase is 4 / (23 + 10000) = 0.03%. It seems negligible, right?
> > >
> > > Hi Dong,
> > >
> > > Thanks for the response.  I agree that the FetchRequest / FetchResponse
> > > overhead should be OK, now that we have incremental fetch requests and
> > > responses.  However, there are a lot of cases where the percentage
> > increase
> > > is much greater.  For example, if a client is doing full
> > MetadataRequests /
> > > Responses, we have some math kind of like this per partition:
> > >
> > > > UpdateMetadataRequestPartitionState => topic partition
> > controller_epoch
> > > leader  leader_epoch partition_epoch isr zk_version replicas
> > > offline_replicas
> > > > 14 bytes:  topic => string (assuming about 10 byte topic names)
> > > > 4 bytes:  partition => int32
> > > > 4  bytes: conroller_epoch => int32
> > > > 4  bytes: leader => int32
> > > > 4  bytes: leader_epoch => int32
> > > > +4 EXTRA bytes: partition_epoch => int32        <-- NEW
> > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
> > > > 4 bytes: zk_version => int32
> > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
> > > > 2  offline_replicas => [int32] (assuming no offline replicas)
> > >
> > > Assuming I added that up correctly, the per-partition overhead goes from
> > > 64 bytes per partition to 68, a 6.2% increase.
> > >
> > > We could do similar math for a lot of the other RPCs.  And you will have
> > a
> > > similar memory and garbage collection impact on the brokers since you
> > have
> > > to store all this extra state as well.
> > >
> >
> > That is correct. IMO the Metadata is only updated periodically and is
> > probably not a big deal if we increase it by 6%. The FetchResponse and
> > ProduceRequest are probably the only requests that are bounded by the
> > bandwidth throughput.
> >
> >
> > >
> > > > >
> > > > > I agree that we can probably save more space by using partition ID so
> > > that
> > > > > we no longer needs the string topic name. The similar idea has also
> > > been
> > > > > put in the Rejected Alternative section in KIP-227. While this idea
> > is
> > > > > promising, it seems orthogonal to the goal of this KIP. Given that
> > > there is
> > > > > already many work to do in this KIP, maybe we can do the partition ID
> > > in a
> > > > > separate KIP?
> > >
> > > I guess my thinking is that the goal here is to replace an identifier
> > > which can be re-used (the tuple of topic name, partition ID) with an
> > > identifier that cannot be re-used (the tuple of topic name, partition ID,
> > > partition epoch) in order to gain better semantics.  As long as we are
> > > replacing the identifier, why not replace it with an identifier that has
> > > important performance advantages?  The KIP freeze for the next release
> > has
> > > already passed, so there is time to do this.
> > >
> >
> > In general it can be easier for discussion and implementation if we can
> > split a larger task into smaller and independent tasks. For example,
> > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, KIP-32 and
> > KIP-33 are about timestamp support. The option on this can be subject
> > though.
> >
> > IMO the change to switch from (topic, partition ID) to partitionEpch in all
> > request/response requires us to going through all request one by one. It
> > may not be hard but it can be time consuming and tedious. At high level the
> > goal and the change for that will be orthogonal to the changes required in
> > this KIP. That is the main reason I think we can split them into two KIPs.
> >
> >
> > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
> > > > I think it is possible to move to entirely use partitionEpoch instead
> > of
> > > > (topic, partition) to identify a partition. Client can obtain the
> > > > partitionEpoch -> (topic, partition) mapping from MetadataResponse. We
> > > > probably need to figure out a way to assign partitionEpoch to existing
> > > > partitions in the cluster. But this should be doable.
> > > >
> > > > This is a good idea. I think it will save us some space in the
> > > > request/response. The actual space saving in percentage probably
> > depends
> > > on
> > > > the amount of data and the number of partitions of the same topic. I
> > just
> > > > think we can do it in a separate KIP.
> > >
> > > Hmm.  How much extra work would be required?  It seems like we are
> > already
> > > changing almost every RPC that involves topics and partitions, already
> > > adding new per-partition state to ZooKeeper, already changing how clients
> > > interact with partitions.  Is there some other big piece of work we'd
> > have
> > > to do to move to partition IDs that we wouldn't need for partition
> > epochs?
> > > I guess we'd have to find a way to support regular expression-based topic
> > > subscriptions.  If we split this into multiple KIPs, wouldn't we end up
> > > changing all that RPCs and ZK state a second time?  Also, I'm curious if
> > > anyone has done any proof of concept GC, memory, and network usage
> > > measurements on switching topic names for topic IDs.
> > >
> >
> >
> > We will need to go over all requests/responses to check how to replace
> > (topic, partition ID) with partition epoch. It requires non-trivial work
> > and could take time. As you mentioned, we may want to see how much saving
> > we can get by switching from topic names to partition epoch. That itself
> > requires time and experiment. It seems that the new idea does not rollback
> > any change proposed in this KIP. So I am not sure we can get much by
> > putting them into the same KIP.
> >
> > Anyway, if more people are interested in seeing the new idea in the same
> > KIP, I can try that.
> >
> >
> >
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > >
> > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > > >
> > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
> > > > >> > Hey Colin,
> > > > >> >
> > > > >> >
> > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
> > cmcc...@apache.org>
> > > > >> wrote:
> > > > >> >
> > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> > > > >> > > > Hey Colin,
> > > > >> > > >
> > > > >> > > > Thanks for the comment.
> > > > >> > > >
> > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
> > > cmcc...@apache.org>
> > > > >> > > wrote:
> > > > >> > > >
> > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> > > > >> > > > > > Hey Colin,
> > > > >> > > > > >
> > > > >> > > > > > Thanks for reviewing the KIP.
> > > > >> > > > > >
> > > > >> > > > > > If I understand you right, you maybe suggesting that we
> > can
> > > use
> > > > >> a
> > > > >> > > global
> > > > >> > > > > > metadataEpoch that is incremented every time controller
> > > updates
> > > > >> > > metadata.
> > > > >> > > > > > The problem with this solution is that, if a topic is
> > > deleted
> > > > >> and
> > > > >> > > created
> > > > >> > > > > > again, user will not know whether that the offset which is
> > > > >> stored
> > > > >> > > before
> > > > >> > > > > > the topic deletion is no longer valid. This motivates the
> > > idea
> > > > >> to
> > > > >> > > include
> > > > >> > > > > > per-partition partitionEpoch. Does this sound reasonable?
> > > > >> > > > >
> > > > >> > > > > Hi Dong,
> > > > >> > > > >
> > > > >> > > > > Perhaps we can store the last valid offset of each deleted
> > > topic
> > > > >> in
> > > > >> > > > > ZooKeeper.  Then, when a topic with one of those names gets
> > > > >> > > re-created, we
> > > > >> > > > > can start the topic at the previous end offset rather than
> > at
> > > 0.
> > > > >> This
> > > > >> > > > > preserves immutability.  It is no more burdensome than
> > having
> > > to
> > > > >> > > preserve a
> > > > >> > > > > "last epoch" for the deleted partition somewhere, right?
> > > > >> > > > >
> > > > >> > > >
> > > > >> > > > My concern with this solution is that the number of zookeeper
> > > nodes
> > > > >> get
> > > > >> > > > more and more over time if some users keep deleting and
> > creating
> > > > >> topics.
> > > > >> > > Do
> > > > >> > > > you think this can be a problem?
> > > > >> > >
> > > > >> > > Hi Dong,
> > > > >> > >
> > > > >> > > We could expire the "partition tombstones" after an hour or so.
> > > In
> > > > >> > > practice this would solve the issue for clients that like to
> > > destroy
> > > > >> and
> > > > >> > > re-create topics all the time.  In any case, doesn't the current
> > > > >> proposal
> > > > >> > > add per-partition znodes as well that we have to track even
> > after
> > > the
> > > > >> > > partition is deleted?  Or did I misunderstand that?
> > > > >> > >
> > > > >> >
> > > > >> > Actually the current KIP does not add per-partition znodes. Could
> > > you
> > > > >> > double check? I can fix the KIP wiki if there is anything
> > > misleading.
> > > > >>
> > > > >> Hi Dong,
> > > > >>
> > > > >> I double-checked the KIP, and I can see that you are in fact using a
> > > > >> global counter for initializing partition epochs.  So, you are
> > > correct, it
> > > > >> doesn't add per-partition znodes for partitions that no longer
> > exist.
> > > > >>
> > > > >> >
> > > > >> > If we expire the "partition tomstones" after an hour, and the
> > topic
> > > is
> > > > >> > re-created after more than an hour since the topic deletion, then
> > > we are
> > > > >> > back to the situation where user can not tell whether the topic
> > has
> > > been
> > > > >> > re-created or not, right?
> > > > >>
> > > > >> Yes, with an expiration period, it would not ensure immutability--
> > you
> > > > >> could effectively reuse partition names and they would look the
> > same.
> > > > >>
> > > > >> >
> > > > >> >
> > > > >> > >
> > > > >> > > It's not really clear to me what should happen when a topic is
> > > > >> destroyed
> > > > >> > > and re-created with new data.  Should consumers continue to be
> > > able to
> > > > >> > > consume?  We don't know where they stopped consuming from the
> > > previous
> > > > >> > > incarnation of the topic, so messages may have been lost.
> > > Certainly
> > > > >> > > consuming data from offset X of the new incarnation of the topic
> > > may
> > > > >> give
> > > > >> > > something totally different from what you would have gotten from
> > > > >> offset X
> > > > >> > > of the previous incarnation of the topic.
> > > > >> > >
> > > > >> >
> > > > >> > With the current KIP, if a consumer consumes a topic based on the
> > > last
> > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and if the topic
> > > is
> > > > >> > re-created, consume will throw InvalidPartitionEpochException
> > > because
> > > > >> the
> > > > >> > previous partitionEpoch will be different from the current
> > > > >> partitionEpoch.
> > > > >> > This is described in the Proposed Changes -> Consumption after
> > topic
> > > > >> > deletion in the KIP. I can improve the KIP if there is anything
> > not
> > > > >> clear.
> > > > >>
> > > > >> Thanks for the clarification.  It sounds like what you really want
> > is
> > > > >> immutability-- i.e., to never "really" reuse partition identifiers.
> > > And
> > > > >> you do this by making the partition name no longer the "real"
> > > identifier.
> > > > >>
> > > > >> My big concern about this KIP is that it seems like an
> > > anti-scalability
> > > > >> feature.  Now we are adding 4 extra bytes for every partition in the
> > > > >> FetchResponse and Request, for example.  That could be 40 kb per
> > > request,
> > > > >> if the user has 10,000 partitions.  And of course, the KIP also
> > makes
> > > > >> massive changes to UpdateMetadataRequest, MetadataResponse,
> > > > >> OffsetCommitRequest, OffsetFetchResponse, LeaderAndIsrRequest,
> > > > >> ListOffsetResponse, etc. which will also increase their size on the
> > > wire
> > > > >> and in memory.
> > > > >>
> > > > >> One thing that we talked a lot about in the past is replacing
> > > partition
> > > > >> names with IDs.  IDs have a lot of really nice features.  They take
> > > up much
> > > > >> less space in memory than strings (especially 2-byte Java strings).
> > > They
> > > > >> can often be allocated on the stack rather than the heap (important
> > > when
> > > > >> you are dealing with hundreds of thousands of them).  They can be
> > > > >> efficiently deserialized and serialized.  If we use 64-bit ones, we
> > > will
> > > > >> never run out of IDs, which means that they can always be unique per
> > > > >> partition.
> > > > >>
> > > > >> Given that the partition name is no longer the "real" identifier for
> > > > >> partitions in the current KIP-232 proposal, why not just move to
> > using
> > > > >> partition IDs entirely instead of strings?  You have to change all
> > the
> > > > >> messages anyway.  There isn't much point any more to carrying around
> > > the
> > > > >> partition name in every RPC, since you really need (name, epoch) to
> > > > >> identify the partition.
> > > > >> Probably the metadata response and a few other messages would have
> > to
> > > > >> still carry the partition name, to allow clients to go from name to
> > > id.
> > > > >> But we could mostly forget about the strings.  And then this would
> > be
> > > a
> > > > >> scalability improvement rather than a scalability problem.
> > > > >>
> > > > >> >
> > > > >> >
> > > > >> > > By choosing to reuse the same (topic, partition, offset)
> > 3-tuple,
> > > we
> > > > >> have
> > > > >> >
> > > > >> > chosen to give up immutability.  That was a really bad decision.
> > > And
> > > > >> now
> > > > >> > > we have to worry about time dependencies, stale cached data, and
> > > all
> > > > >> the
> > > > >> > > rest.  We can't completely fix this inside Kafka no matter what
> > > we do,
> > > > >> > > because not all that cached data is inside Kafka itself.  Some
> > of
> > > it
> > > > >> may be
> > > > >> > > in systems that Kafka has sent data to, such as other daemons,
> > SQL
> > > > >> > > databases, streams, and so forth.
> > > > >> > >
> > > > >> >
> > > > >> > The current KIP will uniquely identify a message using (topic,
> > > > >> partition,
> > > > >> > offset, partitionEpoch) 4-tuple. This addresses the message
> > > immutability
> > > > >> > issue that you mentioned. Is there any corner case where the
> > message
> > > > >> > immutability is still not preserved with the current KIP?
> > > > >> >
> > > > >> >
> > > > >> > >
> > > > >> > > I guess the idea here is that mirror maker should work as
> > expected
> > > > >> when
> > > > >> > > users destroy a topic and re-create it with the same name.
> > That's
> > > > >> kind of
> > > > >> > > tough, though, since in that scenario, mirror maker probably
> > > should
> > > > >> destroy
> > > > >> > > and re-create the topic on the other end, too, right?
> > Otherwise,
> > > > >> what you
> > > > >> > > end up with on the other end could be half of one incarnation of
> > > the
> > > > >> topic,
> > > > >> > > and half of another.
> > > > >> > >
> > > > >> > > What mirror maker really needs is to be able to follow a stream
> > of
> > > > >> events
> > > > >> > > about the kafka cluster itself.  We could have some master topic
> > > > >> which is
> > > > >> > > always present and which contains data about all topic
> > deletions,
> > > > >> > > creations, etc.  Then MM can simply follow this topic and do
> > what
> > > is
> > > > >> needed.
> > > > >> > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > >
> > > > >> > > > > >
> > > > >> > > > > > Then the next question maybe, should we use a global
> > > > >> metadataEpoch +
> > > > >> > > > > > per-partition partitionEpoch, instead of using
> > per-partition
> > > > >> > > leaderEpoch
> > > > >> > > > > +
> > > > >> > > > > > per-partition leaderEpoch. The former solution using
> > > > >> metadataEpoch
> > > > >> > > would
> > > > >> > > > > > not work due to the following scenario (provided by Jun):
> > > > >> > > > > >
> > > > >> > > > > > "Consider the following scenario. In metadata v1, the
> > leader
> > > > >> for a
> > > > >> > > > > > partition is at broker 1. In metadata v2, leader is at
> > > broker
> > > > >> 2. In
> > > > >> > > > > > metadata v3, leader is at broker 1 again. The last
> > committed
> > > > >> offset
> > > > >> > > in
> > > > >> > > > > v1,
> > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A consumer is
> > > > >> started and
> > > > >> > > > > reads
> > > > >> > > > > > metadata v1 and reads messages from offset 0 to 25 from
> > > broker
> > > > >> 1. My
> > > > >> > > > > > understanding is that in the current proposal, the
> > metadata
> > > > >> version
> > > > >> > > > > > associated with offset 25 is v1. The consumer is then
> > > restarted
> > > > >> and
> > > > >> > > > > fetches
> > > > >> > > > > > metadata v2. The consumer tries to read from broker 2,
> > > which is
> > > > >> the
> > > > >> > > old
> > > > >> > > > > > leader with the last offset at 20. In this case, the
> > > consumer
> > > > >> will
> > > > >> > > still
> > > > >> > > > > > get OffsetOutOfRangeException incorrectly."
> > > > >> > > > > >
> > > > >> > > > > > Regarding your comment "For the second purpose, this is
> > > "soft
> > > > >> state"
> > > > >> > > > > > anyway.  If the client thinks X is the leader but Y is
> > > really
> > > > >> the
> > > > >> > > leader,
> > > > >> > > > > > the client will talk to X, and X will point out its
> > mistake
> > > by
> > > > >> > > sending
> > > > >> > > > > back
> > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The
> > > > >> problem
> > > > >> > > here is
> > > > >> > > > > > that the old leader X may still think it is the leader of
> > > the
> > > > >> > > partition
> > > > >> > > > > and
> > > > >> > > > > > thus it will not send back NOT_LEADER_FOR_PARTITION. The
> > > reason
> > > > >> is
> > > > >> > > > > provided
> > > > >> > > > > > in KAFKA-6262. Can you check if that makes sense?
> > > > >> > > > >
> > > > >> > > > > This is solvable with a timeout, right?  If the leader can't
> > > > >> > > communicate
> > > > >> > > > > with the controller for a certain period of time, it should
> > > stop
> > > > >> > > acting as
> > > > >> > > > > the leader.  We have to solve this problem, anyway, in order
> > > to
> > > > >> fix
> > > > >> > > all the
> > > > >> > > > > corner cases.
> > > > >> > > > >
> > > > >> > > >
> > > > >> > > > Not sure if I fully understand your proposal. The proposal
> > > seems to
> > > > >> > > require
> > > > >> > > > non-trivial changes to our existing leadership election
> > > mechanism.
> > > > >> Could
> > > > >> > > > you provide more detail regarding how it works? For example,
> > how
> > > > >> should
> > > > >> > > > user choose this timeout, how leader determines whether it can
> > > still
> > > > >> > > > communicate with controller, and how this triggers controller
> > to
> > > > >> elect
> > > > >> > > new
> > > > >> > > > leader?
> > > > >> > >
> > > > >> > > Before I come up with any proposal, let me make sure I
> > understand
> > > the
> > > > >> > > problem correctly.  My big question was, what prevents
> > split-brain
> > > > >> here?
> > > > >> > >
> > > > >> > > Let's say I have a partition which is on nodes A, B, and C, with
> > > > >> min-ISR
> > > > >> > > 2.  The controller is D.  At some point, there is a network
> > > partition
> > > > >> > > between A and B and the rest of the cluster.  The Controller
> > > > >> re-assigns the
> > > > >> > > partition to nodes C, D, and E.  But A and B keep chugging away,
> > > even
> > > > >> > > though they can no longer communicate with the controller.
> > > > >> > >
> > > > >> > > At some point, a client with stale metadata writes to the
> > > partition.
> > > > >> It
> > > > >> > > still thinks the partition is on node A, B, and C, so that's
> > > where it
> > > > >> sends
> > > > >> > > the data.  It's unable to talk to C, but A and B reply back that
> > > all
> > > > >> is
> > > > >> > > well.
> > > > >> > >
> > > > >> > > Is this not a case where we could lose data due to split brain?
> > > Or is
> > > > >> > > there a mechanism for preventing this that I missed?  If it is,
> > it
> > > > >> seems
> > > > >> > > like a pretty serious failure case that we should be handling
> > > with our
> > > > >> > > metadata rework.  And I think epoch numbers and timeouts might
> > be
> > > > >> part of
> > > > >> > > the solution.
> > > > >> > >
> > > > >> >
> > > > >> > Right, split brain can happen if RF=4 and minIsr=2. However, I am
> > > not
> > > > >> sure
> > > > >> > it is a pretty serious issue which we need to address today. This
> > > can be
> > > > >> > prevented by configuring the Kafka topic so that minIsr > RF/2.
> > > > >> Actually,
> > > > >> > if user sets minIsr=2, is there anything reason that user wants to
> > > set
> > > > >> RF=4
> > > > >> > instead of 4?
> > > > >> >
> > > > >> > Introducing timeout in leader election mechanism is non-trivial. I
> > > > >> think we
> > > > >> > probably want to do that only if there is good use-case that can
> > not
> > > > >> > otherwise be addressed with the current mechanism.
> > > > >>
> > > > >> I still would like to think about these corner cases more.  But
> > > perhaps
> > > > >> it's not directly related to this KIP.
> > > > >>
> > > > >> regards,
> > > > >> Colin
> > > > >>
> > > > >>
> > > > >> >
> > > > >> >
> > > > >> > > best,
> > > > >> > > Colin
> > > > >> > >
> > > > >> > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > > best,
> > > > >> > > > > Colin
> > > > >> > > > >
> > > > >> > > > > >
> > > > >> > > > > > Regards,
> > > > >> > > > > > Dong
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <
> > > > >> cmcc...@apache.org>
> > > > >> > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Hi Dong,
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks for proposing this KIP.  I think a metadata epoch
> > > is a
> > > > >> > > really
> > > > >> > > > > good
> > > > >> > > > > > > idea.
> > > > >> > > > > > >
> > > > >> > > > > > > I read through the DISCUSS thread, but I still don't
> > have
> > > a
> > > > >> clear
> > > > >> > > > > picture
> > > > >> > > > > > > of why the proposal uses a metadata epoch per partition
> > > rather
> > > > >> > > than a
> > > > >> > > > > > > global metadata epoch.  A metadata epoch per partition
> > is
> > > > >> kind of
> > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes per partition
> > > that we
> > > > >> > > have to
> > > > >> > > > > send
> > > > >> > > > > > > over the wire in every full metadata request, which
> > could
> > > > >> become
> > > > >> > > extra
> > > > >> > > > > > > kilobytes on the wire when the number of partitions
> > > becomes
> > > > >> large.
> > > > >> > > > > Plus,
> > > > >> > > > > > > we have to update all the auxillary classes to include
> > an
> > > > >> epoch.
> > > > >> > > > > > >
> > > > >> > > > > > > We need to have a global metadata epoch anyway to handle
> > > > >> partition
> > > > >> > > > > > > addition and deletion.  For example, if I give you
> > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and
> > > {part1,
> > > > >> > > epoch1},
> > > > >> > > > > which
> > > > >> > > > > > > MetadataResponse is newer?  You have no way of knowing.
> > > It
> > > > >> could
> > > > >> > > be
> > > > >> > > > > that
> > > > >> > > > > > > part2 has just been created, and the response with 2
> > > > >> partitions is
> > > > >> > > > > newer.
> > > > >> > > > > > > Or it coudl be that part2 has just been deleted, and
> > > > >> therefore the
> > > > >> > > > > response
> > > > >> > > > > > > with 1 partition is newer.  You must have a global epoch
> > > to
> > > > >> > > > > disambiguate
> > > > >> > > > > > > these two cases.
> > > > >> > > > > > >
> > > > >> > > > > > > Previously, I worked on the Ceph distributed filesystem.
> > > > >> Ceph had
> > > > >> > > the
> > > > >> > > > > > > concept of a map of the whole cluster, maintained by a
> > few
> > > > >> servers
> > > > >> > > > > doing
> > > > >> > > > > > > paxos.  This map was versioned by a single 64-bit epoch
> > > number
> > > > >> > > which
> > > > >> > > > > > > increased on every change.  It was propagated to clients
> > > > >> through
> > > > >> > > > > gossip.  I
> > > > >> > > > > > > wonder if something similar could work here?
> > > > >> > > > > > >
> > > > >> > > > > > > It seems like the the Kafka MetadataResponse serves two
> > > > >> somewhat
> > > > >> > > > > unrelated
> > > > >> > > > > > > purposes.  Firstly, it lets clients know what partitions
> > > > >> exist in
> > > > >> > > the
> > > > >> > > > > > > system and where they live.  Secondly, it lets clients
> > > know
> > > > >> which
> > > > >> > > nodes
> > > > >> > > > > > > within the partition are in-sync (in the ISR) and which
> > > node
> > > > >> is the
> > > > >> > > > > leader.
> > > > >> > > > > > >
> > > > >> > > > > > > The first purpose is what you really need a metadata
> > epoch
> > > > >> for, I
> > > > >> > > > > think.
> > > > >> > > > > > > You want to know whether a partition exists or not, or
> > you
> > > > >> want to
> > > > >> > > know
> > > > >> > > > > > > which nodes you should talk to in order to write to a
> > > given
> > > > >> > > > > partition.  A
> > > > >> > > > > > > single metadata epoch for the whole response should be
> > > > >> adequate
> > > > >> > > here.
> > > > >> > > > > We
> > > > >> > > > > > > should not change the partition assignment without going
> > > > >> through
> > > > >> > > > > zookeeper
> > > > >> > > > > > > (or a similar system), and this inherently serializes
> > > updates
> > > > >> into
> > > > >> > > a
> > > > >> > > > > > > numbered stream.  Brokers should also stop responding to
> > > > >> requests
> > > > >> > > when
> > > > >> > > > > they
> > > > >> > > > > > > are unable to contact ZK for a certain time period.
> > This
> > > > >> prevents
> > > > >> > > the
> > > > >> > > > > case
> > > > >> > > > > > > where a given partition has been moved off some set of
> > > nodes,
> > > > >> but a
> > > > >> > > > > client
> > > > >> > > > > > > still ends up talking to those nodes and writing data
> > > there.
> > > > >> > > > > > >
> > > > >> > > > > > > For the second purpose, this is "soft state" anyway.  If
> > > the
> > > > >> client
> > > > >> > > > > thinks
> > > > >> > > > > > > X is the leader but Y is really the leader, the client
> > > will
> > > > >> talk
> > > > >> > > to X,
> > > > >> > > > > and
> > > > >> > > > > > > X will point out its mistake by sending back a
> > > > >> > > > > NOT_LEADER_FOR_PARTITION.
> > > > >> > > > > > > Then the client can update its metadata again and find
> > > the new
> > > > >> > > leader,
> > > > >> > > > > if
> > > > >> > > > > > > there is one.  There is no need for an epoch to handle
> > > this.
> > > > >> > > > > Similarly, I
> > > > >> > > > > > > can't think of a reason why changing the in-sync replica
> > > set
> > > > >> needs
> > > > >> > > to
> > > > >> > > > > bump
> > > > >> > > > > > > the epoch.
> > > > >> > > > > > >
> > > > >> > > > > > > best,
> > > > >> > > > > > > Colin
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> > > > >> > > > > > > > Thanks much for reviewing the KIP!
> > > > >> > > > > > > >
> > > > >> > > > > > > > Dong
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <
> > > > >> > > wangg...@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Yeah that makes sense, again I'm just making sure we
> > > > >> understand
> > > > >> > > > > all the
> > > > >> > > > > > > > > scenarios and what to expect.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I agree that if, more generally speaking, say users
> > > have
> > > > >> only
> > > > >> > > > > consumed
> > > > >> > > > > > > to
> > > > >> > > > > > > > > offset 8, and then call seek(16) to "jump" to a
> > > further
> > > > >> > > position,
> > > > >> > > > > then
> > > > >> > > > > > > she
> > > > >> > > > > > > > > needs to be aware that OORE maybe thrown and she
> > > needs to
> > > > >> > > handle
> > > > >> > > > > it or
> > > > >> > > > > > > rely
> > > > >> > > > > > > > > on reset policy which should not surprise her.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I'm +1 on the KIP.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <
> > > > >> > > lindon...@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Yes, in general we can not prevent
> > > > >> OffsetOutOfRangeException
> > > > >> > > if
> > > > >> > > > > user
> > > > >> > > > > > > > > seeks
> > > > >> > > > > > > > > > to a wrong offset. The main goal is to prevent
> > > > >> > > > > > > OffsetOutOfRangeException
> > > > >> > > > > > > > > if
> > > > >> > > > > > > > > > user has done things in the right way, e.g. user
> > > should
> > > > >> know
> > > > >> > > that
> > > > >> > > > > > > there
> > > > >> > > > > > > > > is
> > > > >> > > > > > > > > > message with this offset.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > For example, if user calls seek(..) right after
> > > > >> > > construction, the
> > > > >> > > > > > > only
> > > > >> > > > > > > > > > reason I can think of is that user stores offset
> > > > >> externally.
> > > > >> > > In
> > > > >> > > > > this
> > > > >> > > > > > > > > case,
> > > > >> > > > > > > > > > user currently needs to use the offset which is
> > > obtained
> > > > >> > > using
> > > > >> > > > > > > > > position(..)
> > > > >> > > > > > > > > > from the last run. With this KIP, user needs to
> > get
> > > the
> > > > >> > > offset
> > > > >> > > > > and
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and
> > > stores
> > > > >> > > these
> > > > >> > > > > > > > > information
> > > > >> > > > > > > > > > externally. The next time user starts consumer,
> > > he/she
> > > > >> needs
> > > > >> > > to
> > > > >> > > > > call
> > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right after
> > > construction.
> > > > >> > > Then KIP
> > > > >> > > > > > > should
> > > > >> > > > > > > > > be
> > > > >> > > > > > > > > > able to ensure that we don't throw
> > > > >> OffsetOutOfRangeException
> > > > >> > > if
> > > > >> > > > > > > there is
> > > > >> > > > > > > > > no
> > > > >> > > > > > > > > > unclean leader election.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Does this sound OK?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Regards,
> > > > >> > > > > > > > > > Dong
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <
> > > > >> > > > > wangg...@gmail.com>
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > "If consumer wants to consume message with
> > offset
> > > 16,
> > > > >> then
> > > > >> > > > > consumer
> > > > >> > > > > > > > > must
> > > > >> > > > > > > > > > > have
> > > > >> > > > > > > > > > > already fetched message with offset 15"
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > --> this may not be always true right? What if
> > > > >> consumer
> > > > >> > > just
> > > > >> > > > > call
> > > > >> > > > > > > > > > seek(16)
> > > > >> > > > > > > > > > > after construction and then poll without
> > committed
> > > > >> offset
> > > > >> > > ever
> > > > >> > > > > > > stored
> > > > >> > > > > > > > > > > before? Admittedly it is rare but we do not
> > > > >> programmably
> > > > >> > > > > disallow
> > > > >> > > > > > > it.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Guozhang
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <
> > > > >> > > > > lindon...@gmail.com>
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Hey Guozhang,
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP!
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > In the scenario you described, let's assume
> > that
> > > > >> broker
> > > > >> > > A has
> > > > >> > > > > > > > > messages
> > > > >> > > > > > > > > > > with
> > > > >> > > > > > > > > > > > offset up to 10, and broker B has messages
> > with
> > > > >> offset
> > > > >> > > up to
> > > > >> > > > > 20.
> > > > >> > > > > > > If
> > > > >> > > > > > > > > > > > consumer wants to consume message with offset
> > > 9, it
> > > > >> will
> > > > >> > > not
> > > > >> > > > > > > receive
> > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > > > >> > > > > > > > > > > > from broker A.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > If consumer wants to consume message with
> > offset
> > > > >> 16, then
> > > > >> > > > > > > consumer
> > > > >> > > > > > > > > must
> > > > >> > > > > > > > > > > > have already fetched message with offset 15,
> > > which
> > > > >> can
> > > > >> > > only
> > > > >> > > > > come
> > > > >> > > > > > > from
> > > > >> > > > > > > > > > > > broker B. Because consumer will fetch from
> > > broker B
> > > > >> only
> > > > >> > > if
> > > > >> > > > > > > > > leaderEpoch
> > > > >> > > > > > > > > > > >=
> > > > >> > > > > > > > > > > > 2, then the current consumer leaderEpoch can
> > > not be
> > > > >> 1
> > > > >> > > since
> > > > >> > > > > this
> > > > >> > > > > > > KIP
> > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we will not
> > > have
> > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > > > >> > > > > > > > > > > > in this case.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Does this address your question, or maybe
> > there
> > > is
> > > > >> more
> > > > >> > > > > advanced
> > > > >> > > > > > > > > > scenario
> > > > >> > > > > > > > > > > > that the KIP does not handle?
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > Dong
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang
> > Wang <
> > > > >> > > > > > > wangg...@gmail.com>
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the wiki and
> > > it
> > > > >> lgtm.
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Just a quick question: can we completely
> > > > >> eliminate the
> > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with this
> > approach?
> > > Say
> > > > >> if
> > > > >> > > there
> > > > >> > > > > is
> > > > >> > > > > > > > > > > consecutive
> > > > >> > > > > > > > > > > > > leader changes such that the cached
> > metadata's
> > > > >> > > partition
> > > > >> > > > > epoch
> > > > >> > > > > > > is
> > > > >> > > > > > > > > 1,
> > > > >> > > > > > > > > > > and
> > > > >> > > > > > > > > > > > > the metadata fetch response returns  with
> > > > >> partition
> > > > >> > > epoch 2
> > > > >> > > > > > > > > pointing
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > > > leader broker A, while the actual up-to-date
> > > > >> metadata
> > > > >> > > has
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > > > > epoch 3
> > > > >> > > > > > > > > > > > > whose leader is now broker B, the metadata
> > > > >> refresh will
> > > > >> > > > > still
> > > > >> > > > > > > > > succeed
> > > > >> > > > > > > > > > > and
> > > > >> > > > > > > > > > > > > the follow-up fetch request may still see
> > > OORE?
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Guozhang
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <
> > > > >> > > > > lindon...@gmail.com
> > > > >> > > > > > > >
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > Hi all,
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > I would like to start the voting process
> > for
> > > > >> KIP-232:
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/
> > > > >> > > confluence/display/KAFKA/KIP-
> > > > >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat
> > > > >> a+using+leaderEpoch+
> > > > >> > > > > > > > > > and+partitionEpoch
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > The KIP will help fix a concurrency issue
> > in
> > > > >> Kafka
> > > > >> > > which
> > > > >> > > > > > > > > currently
> > > > >> > > > > > > > > > > can
> > > > >> > > > > > > > > > > > > > cause message loss or message duplication
> > in
> > > > >> > > consumer.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > Regards,
> > > > >> > > > > > > > > > > > > > Dong
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > --
> > > > >> > > > > > > > > > > > > -- Guozhang
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > --
> > > > >> > > > > > > > > > > -- Guozhang
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > --
> > > > >> > > > > > > > > -- Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > >
> > > > >> > >
> > > > >>
> > > > >
> > > > >
> > >
> >

Reply via email to