Hi, Dong,

Thanks for the reply. The general idea that you had for adding partitions
is similar to what we had in mind. It would be useful to make this more
general, allowing adding an arbitrary number of partitions (instead of just
doubling) and potentially removing partitions as well. The following is the
high level idea from the discussion with Colin, Jason and Ismael.

* To change the number of partitions from X to Y in a topic, the controller
marks all existing X partitions as read-only and creates Y new partitions.
The new partitions are writable and are tagged with a higher repartition
epoch (RE).

* The controller propagates the new metadata to every broker. Once the
leader of a partition is marked as read-only, it rejects the produce
requests on this partition. The producer will then refresh the metadata and
start publishing to the new writable partitions.

* The consumers will then be consuming messages in RE order. The consumer
coordinator will only assign partitions in the same RE to consumers. Only
after all messages in an RE are consumed, will partitions in a higher RE be
assigned to consumers.

As Colin mentioned, if we do the above, we could potentially (1) use a
globally unique partition id, or (2) use a globally unique topic id to
distinguish recreated partitions due to topic deletion.

So, perhaps we can sketch out the re-partitioning KIP a bit more and see if
there is any overlap with KIP-232. Would you be interested in doing that?
If not, we can do that next week.

Jun


On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Interestingly I am also planning to sketch a KIP to allow partition
> expansion for keyed topics after this KIP. Since you are already doing
> that, I guess I will just share my high level idea here in case it is
> helpful.
>
> The motivation for the KIP is that we currently lose order guarantee for
> messages with the same key if we expand partitions of keyed topic.
>
> The solution can probably be built upon the following ideas:
>
> - Partition number of the keyed topic should always be doubled (or
> multiplied by power of 2). Given that we select a partition based on
> hash(key) % partitionNum, this should help us ensure that, a message
> assigned to an existing partition will not be mapped to another existing
> partition after partition expansion.
>
> - Producer includes in the ProduceRequest some information that helps
> ensure that messages produced ti a partition will monotonically increase in
> the partitionNum of the topic. In other words, if broker receives a
> ProduceRequest and notices that the producer does not know the partition
> number has increased, broker should reject this request. That "information"
> maybe leaderEpoch, max partitionEpoch of the partitions of the topic, or
> simply partitionNum of the topic. The benefit of this property is that we
> can keep the new logic for in-order message consumption entirely in how
> consumer leader determines the partition -> consumer mapping.
>
> - When consumer leader determines partition -> consumer mapping, leader
> first reads the start position for each partition using OffsetFetchRequest.
> If start position are all non-zero, then assignment can be done in its
> current manner. The assumption is that, a message in the new partition
> should only be consumed after all messages with the same key produced
> before it has been consumed. Since some messages in the new partition has
> been consumed, we should not worry about consuming messages out-of-order.
> This benefit of this approach is that we can avoid unnecessary overhead in
> the common case.
>
> - If the consumer leader finds that the start position for some partition
> is 0. Say the current partition number is 18 and the partition index is 12,
> then consumer leader should ensure that messages produced to partition 12 -
> 18/2 = 3 before the first message of partition 12 is consumed, before it
> assigned partition 12 to any consumer in the consumer group. Since we have
> a "information" that is monotonically increasing per partition, consumer
> can read the value of this information from the first message in partition
> 12, get the offset corresponding to this value in partition 3, assign
> partition except for partition 12 (and probably other new partitions) to
> the existing consumers, waiting for the committed offset to go beyond this
> offset for partition 3, and trigger rebalance again so that partition 3 can
> be reassigned to some consumer.
>
>
> Thanks,
> Dong
>
>
> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the KIP. It looks good overall. We are working on a separate
> KIP
> > for adding partitions while preserving the ordering guarantees. That may
> > require another flavor of partition epoch. It's not very clear whether
> that
> > partition epoch can be merged with the partition epoch in this KIP. So,
> > perhaps you can wait on this a bit until we post the other KIP in the
> next
> > few days.
> >
> > Jun
> >
> >
> >
> > On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> wrote:
> >
> > > +1 on the KIP.
> > >
> > > I think the KIP is mainly about adding the capability of tracking the
> > > system state change lineage. It does not seem necessary to bundle this
> > KIP
> > > with replacing the topic partition with partition epoch in
> produce/fetch.
> > > Replacing topic-partition string with partition epoch is essentially a
> > > performance improvement on top of this KIP. That can probably be done
> > > separately.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Colin,
> > > >
> > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey Colin,
> > > > > > >
> > > > > > > I understand that the KIP will adds overhead by introducing
> > > > > per-partition
> > > > > > > partitionEpoch. I am open to alternative solutions that does
> not
> > > > incur
> > > > > > > additional overhead. But I don't see a better way now.
> > > > > > >
> > > > > > > IMO the overhead in the FetchResponse may not be that much. We
> > > > probably
> > > > > > > should discuss the percentage increase rather than the absolute
> > > > number
> > > > > > > increase. Currently after KIP-227, per-partition header has 23
> > > bytes.
> > > > > This
> > > > > > > KIP adds another 4 bytes. Assume the records size is 10KB, the
> > > > > percentage
> > > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems negligible,
> right?
> > > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the response.  I agree that the FetchRequest /
> > FetchResponse
> > > > > overhead should be OK, now that we have incremental fetch requests
> > and
> > > > > responses.  However, there are a lot of cases where the percentage
> > > > increase
> > > > > is much greater.  For example, if a client is doing full
> > > > MetadataRequests /
> > > > > Responses, we have some math kind of like this per partition:
> > > > >
> > > > > > UpdateMetadataRequestPartitionState => topic partition
> > > > controller_epoch
> > > > > leader  leader_epoch partition_epoch isr zk_version replicas
> > > > > offline_replicas
> > > > > > 14 bytes:  topic => string (assuming about 10 byte topic names)
> > > > > > 4 bytes:  partition => int32
> > > > > > 4  bytes: conroller_epoch => int32
> > > > > > 4  bytes: leader => int32
> > > > > > 4  bytes: leader_epoch => int32
> > > > > > +4 EXTRA bytes: partition_epoch => int32        <-- NEW
> > > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
> > > > > > 4 bytes: zk_version => int32
> > > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
> > > > > > 2  offline_replicas => [int32] (assuming no offline replicas)
> > > > >
> > > > > Assuming I added that up correctly, the per-partition overhead goes
> > > from
> > > > > 64 bytes per partition to 68, a 6.2% increase.
> > > > >
> > > > > We could do similar math for a lot of the other RPCs.  And you will
> > > have
> > > > a
> > > > > similar memory and garbage collection impact on the brokers since
> you
> > > > have
> > > > > to store all this extra state as well.
> > > > >
> > > >
> > > > That is correct. IMO the Metadata is only updated periodically and is
> > > > probably not a big deal if we increase it by 6%. The FetchResponse
> and
> > > > ProduceRequest are probably the only requests that are bounded by the
> > > > bandwidth throughput.
> > > >
> > > >
> > > > >
> > > > > > >
> > > > > > > I agree that we can probably save more space by using partition
> > ID
> > > so
> > > > > that
> > > > > > > we no longer needs the string topic name. The similar idea has
> > also
> > > > > been
> > > > > > > put in the Rejected Alternative section in KIP-227. While this
> > idea
> > > > is
> > > > > > > promising, it seems orthogonal to the goal of this KIP. Given
> > that
> > > > > there is
> > > > > > > already many work to do in this KIP, maybe we can do the
> > partition
> > > ID
> > > > > in a
> > > > > > > separate KIP?
> > > > >
> > > > > I guess my thinking is that the goal here is to replace an
> identifier
> > > > > which can be re-used (the tuple of topic name, partition ID) with
> an
> > > > > identifier that cannot be re-used (the tuple of topic name,
> partition
> > > ID,
> > > > > partition epoch) in order to gain better semantics.  As long as we
> > are
> > > > > replacing the identifier, why not replace it with an identifier
> that
> > > has
> > > > > important performance advantages?  The KIP freeze for the next
> > release
> > > > has
> > > > > already passed, so there is time to do this.
> > > > >
> > > >
> > > > In general it can be easier for discussion and implementation if we
> can
> > > > split a larger task into smaller and independent tasks. For example,
> > > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, KIP-32
> > and
> > > > KIP-33 are about timestamp support. The option on this can be subject
> > > > though.
> > > >
> > > > IMO the change to switch from (topic, partition ID) to partitionEpch
> in
> > > all
> > > > request/response requires us to going through all request one by one.
> > It
> > > > may not be hard but it can be time consuming and tedious. At high
> level
> > > the
> > > > goal and the change for that will be orthogonal to the changes
> required
> > > in
> > > > this KIP. That is the main reason I think we can split them into two
> > > KIPs.
> > > >
> > > >
> > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
> > > > > > I think it is possible to move to entirely use partitionEpoch
> > instead
> > > > of
> > > > > > (topic, partition) to identify a partition. Client can obtain the
> > > > > > partitionEpoch -> (topic, partition) mapping from
> MetadataResponse.
> > > We
> > > > > > probably need to figure out a way to assign partitionEpoch to
> > > existing
> > > > > > partitions in the cluster. But this should be doable.
> > > > > >
> > > > > > This is a good idea. I think it will save us some space in the
> > > > > > request/response. The actual space saving in percentage probably
> > > > depends
> > > > > on
> > > > > > the amount of data and the number of partitions of the same
> topic.
> > I
> > > > just
> > > > > > think we can do it in a separate KIP.
> > > > >
> > > > > Hmm.  How much extra work would be required?  It seems like we are
> > > > already
> > > > > changing almost every RPC that involves topics and partitions,
> > already
> > > > > adding new per-partition state to ZooKeeper, already changing how
> > > clients
> > > > > interact with partitions.  Is there some other big piece of work
> we'd
> > > > have
> > > > > to do to move to partition IDs that we wouldn't need for partition
> > > > epochs?
> > > > > I guess we'd have to find a way to support regular expression-based
> > > topic
> > > > > subscriptions.  If we split this into multiple KIPs, wouldn't we
> end
> > up
> > > > > changing all that RPCs and ZK state a second time?  Also, I'm
> curious
> > > if
> > > > > anyone has done any proof of concept GC, memory, and network usage
> > > > > measurements on switching topic names for topic IDs.
> > > > >
> > > >
> > > >
> > > > We will need to go over all requests/responses to check how to
> replace
> > > > (topic, partition ID) with partition epoch. It requires non-trivial
> > work
> > > > and could take time. As you mentioned, we may want to see how much
> > saving
> > > > we can get by switching from topic names to partition epoch. That
> > itself
> > > > requires time and experiment. It seems that the new idea does not
> > > rollback
> > > > any change proposed in this KIP. So I am not sure we can get much by
> > > > putting them into the same KIP.
> > > >
> > > > Anyway, if more people are interested in seeing the new idea in the
> > same
> > > > KIP, I can try that.
> > > >
> > > >
> > > >
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
> > cmcc...@apache.org
> > > >
> > > > > wrote:
> > > > > > >
> > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
> > > > > > >> > Hey Colin,
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
> > > > cmcc...@apache.org>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> > > > > > >> > > > Hey Colin,
> > > > > > >> > > >
> > > > > > >> > > > Thanks for the comment.
> > > > > > >> > > >
> > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
> > > > > cmcc...@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> > > > > > >> > > > > > Hey Colin,
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks for reviewing the KIP.
> > > > > > >> > > > > >
> > > > > > >> > > > > > If I understand you right, you maybe suggesting that
> > we
> > > > can
> > > > > use
> > > > > > >> a
> > > > > > >> > > global
> > > > > > >> > > > > > metadataEpoch that is incremented every time
> > controller
> > > > > updates
> > > > > > >> > > metadata.
> > > > > > >> > > > > > The problem with this solution is that, if a topic
> is
> > > > > deleted
> > > > > > >> and
> > > > > > >> > > created
> > > > > > >> > > > > > again, user will not know whether that the offset
> > which
> > > is
> > > > > > >> stored
> > > > > > >> > > before
> > > > > > >> > > > > > the topic deletion is no longer valid. This
> motivates
> > > the
> > > > > idea
> > > > > > >> to
> > > > > > >> > > include
> > > > > > >> > > > > > per-partition partitionEpoch. Does this sound
> > > reasonable?
> > > > > > >> > > > >
> > > > > > >> > > > > Hi Dong,
> > > > > > >> > > > >
> > > > > > >> > > > > Perhaps we can store the last valid offset of each
> > deleted
> > > > > topic
> > > > > > >> in
> > > > > > >> > > > > ZooKeeper.  Then, when a topic with one of those names
> > > gets
> > > > > > >> > > re-created, we
> > > > > > >> > > > > can start the topic at the previous end offset rather
> > than
> > > > at
> > > > > 0.
> > > > > > >> This
> > > > > > >> > > > > preserves immutability.  It is no more burdensome than
> > > > having
> > > > > to
> > > > > > >> > > preserve a
> > > > > > >> > > > > "last epoch" for the deleted partition somewhere,
> right?
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > > > My concern with this solution is that the number of
> > > zookeeper
> > > > > nodes
> > > > > > >> get
> > > > > > >> > > > more and more over time if some users keep deleting and
> > > > creating
> > > > > > >> topics.
> > > > > > >> > > Do
> > > > > > >> > > > you think this can be a problem?
> > > > > > >> > >
> > > > > > >> > > Hi Dong,
> > > > > > >> > >
> > > > > > >> > > We could expire the "partition tombstones" after an hour
> or
> > > so.
> > > > > In
> > > > > > >> > > practice this would solve the issue for clients that like
> to
> > > > > destroy
> > > > > > >> and
> > > > > > >> > > re-create topics all the time.  In any case, doesn't the
> > > current
> > > > > > >> proposal
> > > > > > >> > > add per-partition znodes as well that we have to track
> even
> > > > after
> > > > > the
> > > > > > >> > > partition is deleted?  Or did I misunderstand that?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Actually the current KIP does not add per-partition znodes.
> > > Could
> > > > > you
> > > > > > >> > double check? I can fix the KIP wiki if there is anything
> > > > > misleading.
> > > > > > >>
> > > > > > >> Hi Dong,
> > > > > > >>
> > > > > > >> I double-checked the KIP, and I can see that you are in fact
> > > using a
> > > > > > >> global counter for initializing partition epochs.  So, you are
> > > > > correct, it
> > > > > > >> doesn't add per-partition znodes for partitions that no longer
> > > > exist.
> > > > > > >>
> > > > > > >> >
> > > > > > >> > If we expire the "partition tomstones" after an hour, and
> the
> > > > topic
> > > > > is
> > > > > > >> > re-created after more than an hour since the topic deletion,
> > > then
> > > > > we are
> > > > > > >> > back to the situation where user can not tell whether the
> > topic
> > > > has
> > > > > been
> > > > > > >> > re-created or not, right?
> > > > > > >>
> > > > > > >> Yes, with an expiration period, it would not ensure
> > immutability--
> > > > you
> > > > > > >> could effectively reuse partition names and they would look
> the
> > > > same.
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > >
> > > > > > >> > > It's not really clear to me what should happen when a
> topic
> > is
> > > > > > >> destroyed
> > > > > > >> > > and re-created with new data.  Should consumers continue
> to
> > be
> > > > > able to
> > > > > > >> > > consume?  We don't know where they stopped consuming from
> > the
> > > > > previous
> > > > > > >> > > incarnation of the topic, so messages may have been lost.
> > > > > Certainly
> > > > > > >> > > consuming data from offset X of the new incarnation of the
> > > topic
> > > > > may
> > > > > > >> give
> > > > > > >> > > something totally different from what you would have
> gotten
> > > from
> > > > > > >> offset X
> > > > > > >> > > of the previous incarnation of the topic.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > With the current KIP, if a consumer consumes a topic based
> on
> > > the
> > > > > last
> > > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and if the
> > > topic
> > > > > is
> > > > > > >> > re-created, consume will throw
> InvalidPartitionEpochException
> > > > > because
> > > > > > >> the
> > > > > > >> > previous partitionEpoch will be different from the current
> > > > > > >> partitionEpoch.
> > > > > > >> > This is described in the Proposed Changes -> Consumption
> after
> > > > topic
> > > > > > >> > deletion in the KIP. I can improve the KIP if there is
> > anything
> > > > not
> > > > > > >> clear.
> > > > > > >>
> > > > > > >> Thanks for the clarification.  It sounds like what you really
> > want
> > > > is
> > > > > > >> immutability-- i.e., to never "really" reuse partition
> > > identifiers.
> > > > > And
> > > > > > >> you do this by making the partition name no longer the "real"
> > > > > identifier.
> > > > > > >>
> > > > > > >> My big concern about this KIP is that it seems like an
> > > > > anti-scalability
> > > > > > >> feature.  Now we are adding 4 extra bytes for every partition
> in
> > > the
> > > > > > >> FetchResponse and Request, for example.  That could be 40 kb
> per
> > > > > request,
> > > > > > >> if the user has 10,000 partitions.  And of course, the KIP
> also
> > > > makes
> > > > > > >> massive changes to UpdateMetadataRequest, MetadataResponse,
> > > > > > >> OffsetCommitRequest, OffsetFetchResponse, LeaderAndIsrRequest,
> > > > > > >> ListOffsetResponse, etc. which will also increase their size
> on
> > > the
> > > > > wire
> > > > > > >> and in memory.
> > > > > > >>
> > > > > > >> One thing that we talked a lot about in the past is replacing
> > > > > partition
> > > > > > >> names with IDs.  IDs have a lot of really nice features.  They
> > > take
> > > > > up much
> > > > > > >> less space in memory than strings (especially 2-byte Java
> > > strings).
> > > > > They
> > > > > > >> can often be allocated on the stack rather than the heap
> > > (important
> > > > > when
> > > > > > >> you are dealing with hundreds of thousands of them).  They can
> > be
> > > > > > >> efficiently deserialized and serialized.  If we use 64-bit
> ones,
> > > we
> > > > > will
> > > > > > >> never run out of IDs, which means that they can always be
> unique
> > > per
> > > > > > >> partition.
> > > > > > >>
> > > > > > >> Given that the partition name is no longer the "real"
> identifier
> > > for
> > > > > > >> partitions in the current KIP-232 proposal, why not just move
> to
> > > > using
> > > > > > >> partition IDs entirely instead of strings?  You have to change
> > all
> > > > the
> > > > > > >> messages anyway.  There isn't much point any more to carrying
> > > around
> > > > > the
> > > > > > >> partition name in every RPC, since you really need (name,
> epoch)
> > > to
> > > > > > >> identify the partition.
> > > > > > >> Probably the metadata response and a few other messages would
> > have
> > > > to
> > > > > > >> still carry the partition name, to allow clients to go from
> name
> > > to
> > > > > id.
> > > > > > >> But we could mostly forget about the strings.  And then this
> > would
> > > > be
> > > > > a
> > > > > > >> scalability improvement rather than a scalability problem.
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > By choosing to reuse the same (topic, partition, offset)
> > > > 3-tuple,
> > > > > we
> > > > > > >> have
> > > > > > >> >
> > > > > > >> > chosen to give up immutability.  That was a really bad
> > decision.
> > > > > And
> > > > > > >> now
> > > > > > >> > > we have to worry about time dependencies, stale cached
> data,
> > > and
> > > > > all
> > > > > > >> the
> > > > > > >> > > rest.  We can't completely fix this inside Kafka no matter
> > > what
> > > > > we do,
> > > > > > >> > > because not all that cached data is inside Kafka itself.
> > Some
> > > > of
> > > > > it
> > > > > > >> may be
> > > > > > >> > > in systems that Kafka has sent data to, such as other
> > daemons,
> > > > SQL
> > > > > > >> > > databases, streams, and so forth.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > The current KIP will uniquely identify a message using
> (topic,
> > > > > > >> partition,
> > > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the message
> > > > > immutability
> > > > > > >> > issue that you mentioned. Is there any corner case where the
> > > > message
> > > > > > >> > immutability is still not preserved with the current KIP?
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > >
> > > > > > >> > > I guess the idea here is that mirror maker should work as
> > > > expected
> > > > > > >> when
> > > > > > >> > > users destroy a topic and re-create it with the same name.
> > > > That's
> > > > > > >> kind of
> > > > > > >> > > tough, though, since in that scenario, mirror maker
> probably
> > > > > should
> > > > > > >> destroy
> > > > > > >> > > and re-create the topic on the other end, too, right?
> > > > Otherwise,
> > > > > > >> what you
> > > > > > >> > > end up with on the other end could be half of one
> > incarnation
> > > of
> > > > > the
> > > > > > >> topic,
> > > > > > >> > > and half of another.
> > > > > > >> > >
> > > > > > >> > > What mirror maker really needs is to be able to follow a
> > > stream
> > > > of
> > > > > > >> events
> > > > > > >> > > about the kafka cluster itself.  We could have some master
> > > topic
> > > > > > >> which is
> > > > > > >> > > always present and which contains data about all topic
> > > > deletions,
> > > > > > >> > > creations, etc.  Then MM can simply follow this topic and
> do
> > > > what
> > > > > is
> > > > > > >> needed.
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > Then the next question maybe, should we use a global
> > > > > > >> metadataEpoch +
> > > > > > >> > > > > > per-partition partitionEpoch, instead of using
> > > > per-partition
> > > > > > >> > > leaderEpoch
> > > > > > >> > > > > +
> > > > > > >> > > > > > per-partition leaderEpoch. The former solution using
> > > > > > >> metadataEpoch
> > > > > > >> > > would
> > > > > > >> > > > > > not work due to the following scenario (provided by
> > > Jun):
> > > > > > >> > > > > >
> > > > > > >> > > > > > "Consider the following scenario. In metadata v1,
> the
> > > > leader
> > > > > > >> for a
> > > > > > >> > > > > > partition is at broker 1. In metadata v2, leader is
> at
> > > > > broker
> > > > > > >> 2. In
> > > > > > >> > > > > > metadata v3, leader is at broker 1 again. The last
> > > > committed
> > > > > > >> offset
> > > > > > >> > > in
> > > > > > >> > > > > v1,
> > > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A
> consumer
> > is
> > > > > > >> started and
> > > > > > >> > > > > reads
> > > > > > >> > > > > > metadata v1 and reads messages from offset 0 to 25
> > from
> > > > > broker
> > > > > > >> 1. My
> > > > > > >> > > > > > understanding is that in the current proposal, the
> > > > metadata
> > > > > > >> version
> > > > > > >> > > > > > associated with offset 25 is v1. The consumer is
> then
> > > > > restarted
> > > > > > >> and
> > > > > > >> > > > > fetches
> > > > > > >> > > > > > metadata v2. The consumer tries to read from broker
> 2,
> > > > > which is
> > > > > > >> the
> > > > > > >> > > old
> > > > > > >> > > > > > leader with the last offset at 20. In this case, the
> > > > > consumer
> > > > > > >> will
> > > > > > >> > > still
> > > > > > >> > > > > > get OffsetOutOfRangeException incorrectly."
> > > > > > >> > > > > >
> > > > > > >> > > > > > Regarding your comment "For the second purpose, this
> > is
> > > > > "soft
> > > > > > >> state"
> > > > > > >> > > > > > anyway.  If the client thinks X is the leader but Y
> is
> > > > > really
> > > > > > >> the
> > > > > > >> > > leader,
> > > > > > >> > > > > > the client will talk to X, and X will point out its
> > > > mistake
> > > > > by
> > > > > > >> > > sending
> > > > > > >> > > > > back
> > > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no
> true.
> > > The
> > > > > > >> problem
> > > > > > >> > > here is
> > > > > > >> > > > > > that the old leader X may still think it is the
> leader
> > > of
> > > > > the
> > > > > > >> > > partition
> > > > > > >> > > > > and
> > > > > > >> > > > > > thus it will not send back NOT_LEADER_FOR_PARTITION.
> > The
> > > > > reason
> > > > > > >> is
> > > > > > >> > > > > provided
> > > > > > >> > > > > > in KAFKA-6262. Can you check if that makes sense?
> > > > > > >> > > > >
> > > > > > >> > > > > This is solvable with a timeout, right?  If the leader
> > > can't
> > > > > > >> > > communicate
> > > > > > >> > > > > with the controller for a certain period of time, it
> > > should
> > > > > stop
> > > > > > >> > > acting as
> > > > > > >> > > > > the leader.  We have to solve this problem, anyway, in
> > > order
> > > > > to
> > > > > > >> fix
> > > > > > >> > > all the
> > > > > > >> > > > > corner cases.
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > > > Not sure if I fully understand your proposal. The
> proposal
> > > > > seems to
> > > > > > >> > > require
> > > > > > >> > > > non-trivial changes to our existing leadership election
> > > > > mechanism.
> > > > > > >> Could
> > > > > > >> > > > you provide more detail regarding how it works? For
> > example,
> > > > how
> > > > > > >> should
> > > > > > >> > > > user choose this timeout, how leader determines whether
> it
> > > can
> > > > > still
> > > > > > >> > > > communicate with controller, and how this triggers
> > > controller
> > > > to
> > > > > > >> elect
> > > > > > >> > > new
> > > > > > >> > > > leader?
> > > > > > >> > >
> > > > > > >> > > Before I come up with any proposal, let me make sure I
> > > > understand
> > > > > the
> > > > > > >> > > problem correctly.  My big question was, what prevents
> > > > split-brain
> > > > > > >> here?
> > > > > > >> > >
> > > > > > >> > > Let's say I have a partition which is on nodes A, B, and
> C,
> > > with
> > > > > > >> min-ISR
> > > > > > >> > > 2.  The controller is D.  At some point, there is a
> network
> > > > > partition
> > > > > > >> > > between A and B and the rest of the cluster.  The
> Controller
> > > > > > >> re-assigns the
> > > > > > >> > > partition to nodes C, D, and E.  But A and B keep chugging
> > > away,
> > > > > even
> > > > > > >> > > though they can no longer communicate with the controller.
> > > > > > >> > >
> > > > > > >> > > At some point, a client with stale metadata writes to the
> > > > > partition.
> > > > > > >> It
> > > > > > >> > > still thinks the partition is on node A, B, and C, so
> that's
> > > > > where it
> > > > > > >> sends
> > > > > > >> > > the data.  It's unable to talk to C, but A and B reply
> back
> > > that
> > > > > all
> > > > > > >> is
> > > > > > >> > > well.
> > > > > > >> > >
> > > > > > >> > > Is this not a case where we could lose data due to split
> > > brain?
> > > > > Or is
> > > > > > >> > > there a mechanism for preventing this that I missed?  If
> it
> > > is,
> > > > it
> > > > > > >> seems
> > > > > > >> > > like a pretty serious failure case that we should be
> > handling
> > > > > with our
> > > > > > >> > > metadata rework.  And I think epoch numbers and timeouts
> > might
> > > > be
> > > > > > >> part of
> > > > > > >> > > the solution.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2.
> However, I
> > > am
> > > > > not
> > > > > > >> sure
> > > > > > >> > it is a pretty serious issue which we need to address today.
> > > This
> > > > > can be
> > > > > > >> > prevented by configuring the Kafka topic so that minIsr >
> > RF/2.
> > > > > > >> Actually,
> > > > > > >> > if user sets minIsr=2, is there anything reason that user
> > wants
> > > to
> > > > > set
> > > > > > >> RF=4
> > > > > > >> > instead of 4?
> > > > > > >> >
> > > > > > >> > Introducing timeout in leader election mechanism is
> > > non-trivial. I
> > > > > > >> think we
> > > > > > >> > probably want to do that only if there is good use-case that
> > can
> > > > not
> > > > > > >> > otherwise be addressed with the current mechanism.
> > > > > > >>
> > > > > > >> I still would like to think about these corner cases more.
> But
> > > > > perhaps
> > > > > > >> it's not directly related to this KIP.
> > > > > > >>
> > > > > > >> regards,
> > > > > > >> Colin
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > best,
> > > > > > >> > > Colin
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > > best,
> > > > > > >> > > > > Colin
> > > > > > >> > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > Regards,
> > > > > > >> > > > > > Dong
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <
> > > > > > >> cmcc...@apache.org>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hi Dong,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for proposing this KIP.  I think a metadata
> > > epoch
> > > > > is a
> > > > > > >> > > really
> > > > > > >> > > > > good
> > > > > > >> > > > > > > idea.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I read through the DISCUSS thread, but I still
> don't
> > > > have
> > > > > a
> > > > > > >> clear
> > > > > > >> > > > > picture
> > > > > > >> > > > > > > of why the proposal uses a metadata epoch per
> > > partition
> > > > > rather
> > > > > > >> > > than a
> > > > > > >> > > > > > > global metadata epoch.  A metadata epoch per
> > partition
> > > > is
> > > > > > >> kind of
> > > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes per
> > partition
> > > > > that we
> > > > > > >> > > have to
> > > > > > >> > > > > send
> > > > > > >> > > > > > > over the wire in every full metadata request,
> which
> > > > could
> > > > > > >> become
> > > > > > >> > > extra
> > > > > > >> > > > > > > kilobytes on the wire when the number of
> partitions
> > > > > becomes
> > > > > > >> large.
> > > > > > >> > > > > Plus,
> > > > > > >> > > > > > > we have to update all the auxillary classes to
> > include
> > > > an
> > > > > > >> epoch.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > We need to have a global metadata epoch anyway to
> > > handle
> > > > > > >> partition
> > > > > > >> > > > > > > addition and deletion.  For example, if I give you
> > > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1}
> and
> > > > > {part1,
> > > > > > >> > > epoch1},
> > > > > > >> > > > > which
> > > > > > >> > > > > > > MetadataResponse is newer?  You have no way of
> > > knowing.
> > > > > It
> > > > > > >> could
> > > > > > >> > > be
> > > > > > >> > > > > that
> > > > > > >> > > > > > > part2 has just been created, and the response
> with 2
> > > > > > >> partitions is
> > > > > > >> > > > > newer.
> > > > > > >> > > > > > > Or it coudl be that part2 has just been deleted,
> and
> > > > > > >> therefore the
> > > > > > >> > > > > response
> > > > > > >> > > > > > > with 1 partition is newer.  You must have a global
> > > epoch
> > > > > to
> > > > > > >> > > > > disambiguate
> > > > > > >> > > > > > > these two cases.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Previously, I worked on the Ceph distributed
> > > filesystem.
> > > > > > >> Ceph had
> > > > > > >> > > the
> > > > > > >> > > > > > > concept of a map of the whole cluster, maintained
> > by a
> > > > few
> > > > > > >> servers
> > > > > > >> > > > > doing
> > > > > > >> > > > > > > paxos.  This map was versioned by a single 64-bit
> > > epoch
> > > > > number
> > > > > > >> > > which
> > > > > > >> > > > > > > increased on every change.  It was propagated to
> > > clients
> > > > > > >> through
> > > > > > >> > > > > gossip.  I
> > > > > > >> > > > > > > wonder if something similar could work here?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > It seems like the the Kafka MetadataResponse
> serves
> > > two
> > > > > > >> somewhat
> > > > > > >> > > > > unrelated
> > > > > > >> > > > > > > purposes.  Firstly, it lets clients know what
> > > partitions
> > > > > > >> exist in
> > > > > > >> > > the
> > > > > > >> > > > > > > system and where they live.  Secondly, it lets
> > clients
> > > > > know
> > > > > > >> which
> > > > > > >> > > nodes
> > > > > > >> > > > > > > within the partition are in-sync (in the ISR) and
> > > which
> > > > > node
> > > > > > >> is the
> > > > > > >> > > > > leader.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > The first purpose is what you really need a
> metadata
> > > > epoch
> > > > > > >> for, I
> > > > > > >> > > > > think.
> > > > > > >> > > > > > > You want to know whether a partition exists or
> not,
> > or
> > > > you
> > > > > > >> want to
> > > > > > >> > > know
> > > > > > >> > > > > > > which nodes you should talk to in order to write
> to
> > a
> > > > > given
> > > > > > >> > > > > partition.  A
> > > > > > >> > > > > > > single metadata epoch for the whole response
> should
> > be
> > > > > > >> adequate
> > > > > > >> > > here.
> > > > > > >> > > > > We
> > > > > > >> > > > > > > should not change the partition assignment without
> > > going
> > > > > > >> through
> > > > > > >> > > > > zookeeper
> > > > > > >> > > > > > > (or a similar system), and this inherently
> > serializes
> > > > > updates
> > > > > > >> into
> > > > > > >> > > a
> > > > > > >> > > > > > > numbered stream.  Brokers should also stop
> > responding
> > > to
> > > > > > >> requests
> > > > > > >> > > when
> > > > > > >> > > > > they
> > > > > > >> > > > > > > are unable to contact ZK for a certain time
> period.
> > > > This
> > > > > > >> prevents
> > > > > > >> > > the
> > > > > > >> > > > > case
> > > > > > >> > > > > > > where a given partition has been moved off some
> set
> > of
> > > > > nodes,
> > > > > > >> but a
> > > > > > >> > > > > client
> > > > > > >> > > > > > > still ends up talking to those nodes and writing
> > data
> > > > > there.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > For the second purpose, this is "soft state"
> anyway.
> > > If
> > > > > the
> > > > > > >> client
> > > > > > >> > > > > thinks
> > > > > > >> > > > > > > X is the leader but Y is really the leader, the
> > client
> > > > > will
> > > > > > >> talk
> > > > > > >> > > to X,
> > > > > > >> > > > > and
> > > > > > >> > > > > > > X will point out its mistake by sending back a
> > > > > > >> > > > > NOT_LEADER_FOR_PARTITION.
> > > > > > >> > > > > > > Then the client can update its metadata again and
> > find
> > > > > the new
> > > > > > >> > > leader,
> > > > > > >> > > > > if
> > > > > > >> > > > > > > there is one.  There is no need for an epoch to
> > handle
> > > > > this.
> > > > > > >> > > > > Similarly, I
> > > > > > >> > > > > > > can't think of a reason why changing the in-sync
> > > replica
> > > > > set
> > > > > > >> needs
> > > > > > >> > > to
> > > > > > >> > > > > bump
> > > > > > >> > > > > > > the epoch.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > best,
> > > > > > >> > > > > > > Colin
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> > > > > > >> > > > > > > > Thanks much for reviewing the KIP!
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Dong
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <
> > > > > > >> > > wangg...@gmail.com>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just making
> > sure
> > > we
> > > > > > >> understand
> > > > > > >> > > > > all the
> > > > > > >> > > > > > > > > scenarios and what to expect.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I agree that if, more generally speaking, say
> > > users
> > > > > have
> > > > > > >> only
> > > > > > >> > > > > consumed
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > offset 8, and then call seek(16) to "jump" to
> a
> > > > > further
> > > > > > >> > > position,
> > > > > > >> > > > > then
> > > > > > >> > > > > > > she
> > > > > > >> > > > > > > > > needs to be aware that OORE maybe thrown and
> she
> > > > > needs to
> > > > > > >> > > handle
> > > > > > >> > > > > it or
> > > > > > >> > > > > > > rely
> > > > > > >> > > > > > > > > on reset policy which should not surprise her.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I'm +1 on the KIP.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Guozhang
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <
> > > > > > >> > > lindon...@gmail.com>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > > Yes, in general we can not prevent
> > > > > > >> OffsetOutOfRangeException
> > > > > > >> > > if
> > > > > > >> > > > > user
> > > > > > >> > > > > > > > > seeks
> > > > > > >> > > > > > > > > > to a wrong offset. The main goal is to
> prevent
> > > > > > >> > > > > > > OffsetOutOfRangeException
> > > > > > >> > > > > > > > > if
> > > > > > >> > > > > > > > > > user has done things in the right way, e.g.
> > user
> > > > > should
> > > > > > >> know
> > > > > > >> > > that
> > > > > > >> > > > > > > there
> > > > > > >> > > > > > > > > is
> > > > > > >> > > > > > > > > > message with this offset.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > For example, if user calls seek(..) right
> > after
> > > > > > >> > > construction, the
> > > > > > >> > > > > > > only
> > > > > > >> > > > > > > > > > reason I can think of is that user stores
> > offset
> > > > > > >> externally.
> > > > > > >> > > In
> > > > > > >> > > > > this
> > > > > > >> > > > > > > > > case,
> > > > > > >> > > > > > > > > > user currently needs to use the offset which
> > is
> > > > > obtained
> > > > > > >> > > using
> > > > > > >> > > > > > > > > position(..)
> > > > > > >> > > > > > > > > > from the last run. With this KIP, user needs
> > to
> > > > get
> > > > > the
> > > > > > >> > > offset
> > > > > > >> > > > > and
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > > > > offsetEpoch using
> positionAndOffsetEpoch(...)
> > > and
> > > > > stores
> > > > > > >> > > these
> > > > > > >> > > > > > > > > information
> > > > > > >> > > > > > > > > > externally. The next time user starts
> > consumer,
> > > > > he/she
> > > > > > >> needs
> > > > > > >> > > to
> > > > > > >> > > > > call
> > > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right after
> > > > > construction.
> > > > > > >> > > Then KIP
> > > > > > >> > > > > > > should
> > > > > > >> > > > > > > > > be
> > > > > > >> > > > > > > > > > able to ensure that we don't throw
> > > > > > >> OffsetOutOfRangeException
> > > > > > >> > > if
> > > > > > >> > > > > > > there is
> > > > > > >> > > > > > > > > no
> > > > > > >> > > > > > > > > > unclean leader election.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Does this sound OK?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Regards,
> > > > > > >> > > > > > > > > > Dong
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang
> > Wang
> > > <
> > > > > > >> > > > > wangg...@gmail.com>
> > > > > > >> > > > > > > > > > wrote:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > "If consumer wants to consume message with
> > > > offset
> > > > > 16,
> > > > > > >> then
> > > > > > >> > > > > consumer
> > > > > > >> > > > > > > > > must
> > > > > > >> > > > > > > > > > > have
> > > > > > >> > > > > > > > > > > already fetched message with offset 15"
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > --> this may not be always true right?
> What
> > if
> > > > > > >> consumer
> > > > > > >> > > just
> > > > > > >> > > > > call
> > > > > > >> > > > > > > > > > seek(16)
> > > > > > >> > > > > > > > > > > after construction and then poll without
> > > > committed
> > > > > > >> offset
> > > > > > >> > > ever
> > > > > > >> > > > > > > stored
> > > > > > >> > > > > > > > > > > before? Admittedly it is rare but we do
> not
> > > > > > >> programmably
> > > > > > >> > > > > disallow
> > > > > > >> > > > > > > it.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Guozhang
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong
> Lin <
> > > > > > >> > > > > lindon...@gmail.com>
> > > > > > >> > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Hey Guozhang,
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP!
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > In the scenario you described, let's
> > assume
> > > > that
> > > > > > >> broker
> > > > > > >> > > A has
> > > > > > >> > > > > > > > > messages
> > > > > > >> > > > > > > > > > > with
> > > > > > >> > > > > > > > > > > > offset up to 10, and broker B has
> messages
> > > > with
> > > > > > >> offset
> > > > > > >> > > up to
> > > > > > >> > > > > 20.
> > > > > > >> > > > > > > If
> > > > > > >> > > > > > > > > > > > consumer wants to consume message with
> > > offset
> > > > > 9, it
> > > > > > >> will
> > > > > > >> > > not
> > > > > > >> > > > > > > receive
> > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > > > > > >> > > > > > > > > > > > from broker A.
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > If consumer wants to consume message
> with
> > > > offset
> > > > > > >> 16, then
> > > > > > >> > > > > > > consumer
> > > > > > >> > > > > > > > > must
> > > > > > >> > > > > > > > > > > > have already fetched message with offset
> > 15,
> > > > > which
> > > > > > >> can
> > > > > > >> > > only
> > > > > > >> > > > > come
> > > > > > >> > > > > > > from
> > > > > > >> > > > > > > > > > > > broker B. Because consumer will fetch
> from
> > > > > broker B
> > > > > > >> only
> > > > > > >> > > if
> > > > > > >> > > > > > > > > leaderEpoch
> > > > > > >> > > > > > > > > > > >=
> > > > > > >> > > > > > > > > > > > 2, then the current consumer leaderEpoch
> > can
> > > > > not be
> > > > > > >> 1
> > > > > > >> > > since
> > > > > > >> > > > > this
> > > > > > >> > > > > > > KIP
> > > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we
> will
> > > not
> > > > > have
> > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > > > > > >> > > > > > > > > > > > in this case.
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Does this address your question, or
> maybe
> > > > there
> > > > > is
> > > > > > >> more
> > > > > > >> > > > > advanced
> > > > > > >> > > > > > > > > > scenario
> > > > > > >> > > > > > > > > > > > that the KIP does not handle?
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Thanks,
> > > > > > >> > > > > > > > > > > > Dong
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM,
> Guozhang
> > > > Wang <
> > > > > > >> > > > > > > wangg...@gmail.com>
> > > > > > >> > > > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the
> wiki
> > > and
> > > > > it
> > > > > > >> lgtm.
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > Just a quick question: can we
> completely
> > > > > > >> eliminate the
> > > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with this
> > > > approach?
> > > > > Say
> > > > > > >> if
> > > > > > >> > > there
> > > > > > >> > > > > is
> > > > > > >> > > > > > > > > > > consecutive
> > > > > > >> > > > > > > > > > > > > leader changes such that the cached
> > > > metadata's
> > > > > > >> > > partition
> > > > > > >> > > > > epoch
> > > > > > >> > > > > > > is
> > > > > > >> > > > > > > > > 1,
> > > > > > >> > > > > > > > > > > and
> > > > > > >> > > > > > > > > > > > > the metadata fetch response returns
> > with
> > > > > > >> partition
> > > > > > >> > > epoch 2
> > > > > > >> > > > > > > > > pointing
> > > > > > >> > > > > > > > > > to
> > > > > > >> > > > > > > > > > > > > leader broker A, while the actual
> > > up-to-date
> > > > > > >> metadata
> > > > > > >> > > has
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > > > > epoch 3
> > > > > > >> > > > > > > > > > > > > whose leader is now broker B, the
> > metadata
> > > > > > >> refresh will
> > > > > > >> > > > > still
> > > > > > >> > > > > > > > > succeed
> > > > > > >> > > > > > > > > > > and
> > > > > > >> > > > > > > > > > > > > the follow-up fetch request may still
> > see
> > > > > OORE?
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > Guozhang
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong
> > Lin
> > > <
> > > > > > >> > > > > lindon...@gmail.com
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > > Hi all,
> > > > > > >> > > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > > I would like to start the voting
> > process
> > > > for
> > > > > > >> KIP-232:
> > > > > > >> > > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/
> > > > > > >> > > confluence/display/KAFKA/KIP-
> > > > > > >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat
> > > > > > >> a+using+leaderEpoch+
> > > > > > >> > > > > > > > > > and+partitionEpoch
> > > > > > >> > > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > > The KIP will help fix a concurrency
> > > issue
> > > > in
> > > > > > >> Kafka
> > > > > > >> > > which
> > > > > > >> > > > > > > > > currently
> > > > > > >> > > > > > > > > > > can
> > > > > > >> > > > > > > > > > > > > > cause message loss or message
> > > duplication
> > > > in
> > > > > > >> > > consumer.
> > > > > > >> > > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > > Regards,
> > > > > > >> > > > > > > > > > > > > > Dong
> > > > > > >> > > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > > --
> > > > > > >> > > > > > > > > > > > > -- Guozhang
> > > > > > >> > > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > --
> > > > > > >> > > > > > > > > > > -- Guozhang
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > --
> > > > > > >> > > > > > > > > -- Guozhang
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > >
> > > > > > >> > >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to