Hey Jun,

It seems that we have made considerable progress on the discussion of
KIP-253 since February. Do you think we should continue the discussion
there, or can we continue the voting for this KIP? I am happy to submit the
PR and move forward the progress for this KIP.

Thanks!
Dong


On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Sure, I will come up with a KIP this week. I think there is a way to allow
> partition expansion to arbitrary number without introducing new concepts
> such as read-only partition or repartition epoch.
>
> Thanks,
> Dong
>
> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply. The general idea that you had for adding partitions
>> is similar to what we had in mind. It would be useful to make this more
>> general, allowing adding an arbitrary number of partitions (instead of
>> just
>> doubling) and potentially removing partitions as well. The following is
>> the
>> high level idea from the discussion with Colin, Jason and Ismael.
>>
>> * To change the number of partitions from X to Y in a topic, the
>> controller
>> marks all existing X partitions as read-only and creates Y new partitions.
>> The new partitions are writable and are tagged with a higher repartition
>> epoch (RE).
>>
>> * The controller propagates the new metadata to every broker. Once the
>> leader of a partition is marked as read-only, it rejects the produce
>> requests on this partition. The producer will then refresh the metadata
>> and
>> start publishing to the new writable partitions.
>>
>> * The consumers will then be consuming messages in RE order. The consumer
>> coordinator will only assign partitions in the same RE to consumers. Only
>> after all messages in an RE are consumed, will partitions in a higher RE
>> be
>> assigned to consumers.
>>
>> As Colin mentioned, if we do the above, we could potentially (1) use a
>> globally unique partition id, or (2) use a globally unique topic id to
>> distinguish recreated partitions due to topic deletion.
>>
>> So, perhaps we can sketch out the re-partitioning KIP a bit more and see
>> if
>> there is any overlap with KIP-232. Would you be interested in doing that?
>> If not, we can do that next week.
>>
>> Jun
>>
>>
>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Interestingly I am also planning to sketch a KIP to allow partition
>> > expansion for keyed topics after this KIP. Since you are already doing
>> > that, I guess I will just share my high level idea here in case it is
>> > helpful.
>> >
>> > The motivation for the KIP is that we currently lose order guarantee for
>> > messages with the same key if we expand partitions of keyed topic.
>> >
>> > The solution can probably be built upon the following ideas:
>> >
>> > - Partition number of the keyed topic should always be doubled (or
>> > multiplied by power of 2). Given that we select a partition based on
>> > hash(key) % partitionNum, this should help us ensure that, a message
>> > assigned to an existing partition will not be mapped to another existing
>> > partition after partition expansion.
>> >
>> > - Producer includes in the ProduceRequest some information that helps
>> > ensure that messages produced ti a partition will monotonically
>> increase in
>> > the partitionNum of the topic. In other words, if broker receives a
>> > ProduceRequest and notices that the producer does not know the partition
>> > number has increased, broker should reject this request. That
>> "information"
>> > maybe leaderEpoch, max partitionEpoch of the partitions of the topic, or
>> > simply partitionNum of the topic. The benefit of this property is that
>> we
>> > can keep the new logic for in-order message consumption entirely in how
>> > consumer leader determines the partition -> consumer mapping.
>> >
>> > - When consumer leader determines partition -> consumer mapping, leader
>> > first reads the start position for each partition using
>> OffsetFetchRequest.
>> > If start position are all non-zero, then assignment can be done in its
>> > current manner. The assumption is that, a message in the new partition
>> > should only be consumed after all messages with the same key produced
>> > before it has been consumed. Since some messages in the new partition
>> has
>> > been consumed, we should not worry about consuming messages
>> out-of-order.
>> > This benefit of this approach is that we can avoid unnecessary overhead
>> in
>> > the common case.
>> >
>> > - If the consumer leader finds that the start position for some
>> partition
>> > is 0. Say the current partition number is 18 and the partition index is
>> 12,
>> > then consumer leader should ensure that messages produced to partition
>> 12 -
>> > 18/2 = 3 before the first message of partition 12 is consumed, before it
>> > assigned partition 12 to any consumer in the consumer group. Since we
>> have
>> > a "information" that is monotonically increasing per partition, consumer
>> > can read the value of this information from the first message in
>> partition
>> > 12, get the offset corresponding to this value in partition 3, assign
>> > partition except for partition 12 (and probably other new partitions) to
>> > the existing consumers, waiting for the committed offset to go beyond
>> this
>> > offset for partition 3, and trigger rebalance again so that partition 3
>> can
>> > be reassigned to some consumer.
>> >
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the KIP. It looks good overall. We are working on a
>> separate
>> > KIP
>> > > for adding partitions while preserving the ordering guarantees. That
>> may
>> > > require another flavor of partition epoch. It's not very clear whether
>> > that
>> > > partition epoch can be merged with the partition epoch in this KIP.
>> So,
>> > > perhaps you can wait on this a bit until we post the other KIP in the
>> > next
>> > > few days.
>> > >
>> > > Jun
>> > >
>> > >
>> > >
>> > > On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com>
>> wrote:
>> > >
>> > > > +1 on the KIP.
>> > > >
>> > > > I think the KIP is mainly about adding the capability of tracking
>> the
>> > > > system state change lineage. It does not seem necessary to bundle
>> this
>> > > KIP
>> > > > with replacing the topic partition with partition epoch in
>> > produce/fetch.
>> > > > Replacing topic-partition string with partition epoch is
>> essentially a
>> > > > performance improvement on top of this KIP. That can probably be
>> done
>> > > > separately.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Hey Colin,
>> > > > >
>> > > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <
>> cmcc...@apache.org>
>> > > > wrote:
>> > > > >
>> > > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <
>> lindon...@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Colin,
>> > > > > > > >
>> > > > > > > > I understand that the KIP will adds overhead by introducing
>> > > > > > per-partition
>> > > > > > > > partitionEpoch. I am open to alternative solutions that does
>> > not
>> > > > > incur
>> > > > > > > > additional overhead. But I don't see a better way now.
>> > > > > > > >
>> > > > > > > > IMO the overhead in the FetchResponse may not be that much.
>> We
>> > > > > probably
>> > > > > > > > should discuss the percentage increase rather than the
>> absolute
>> > > > > number
>> > > > > > > > increase. Currently after KIP-227, per-partition header has
>> 23
>> > > > bytes.
>> > > > > > This
>> > > > > > > > KIP adds another 4 bytes. Assume the records size is 10KB,
>> the
>> > > > > > percentage
>> > > > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems negligible,
>> > right?
>> > > > > >
>> > > > > > Hi Dong,
>> > > > > >
>> > > > > > Thanks for the response.  I agree that the FetchRequest /
>> > > FetchResponse
>> > > > > > overhead should be OK, now that we have incremental fetch
>> requests
>> > > and
>> > > > > > responses.  However, there are a lot of cases where the
>> percentage
>> > > > > increase
>> > > > > > is much greater.  For example, if a client is doing full
>> > > > > MetadataRequests /
>> > > > > > Responses, we have some math kind of like this per partition:
>> > > > > >
>> > > > > > > UpdateMetadataRequestPartitionState => topic partition
>> > > > > controller_epoch
>> > > > > > leader  leader_epoch partition_epoch isr zk_version replicas
>> > > > > > offline_replicas
>> > > > > > > 14 bytes:  topic => string (assuming about 10 byte topic
>> names)
>> > > > > > > 4 bytes:  partition => int32
>> > > > > > > 4  bytes: conroller_epoch => int32
>> > > > > > > 4  bytes: leader => int32
>> > > > > > > 4  bytes: leader_epoch => int32
>> > > > > > > +4 EXTRA bytes: partition_epoch => int32        <-- NEW
>> > > > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
>> > > > > > > 4 bytes: zk_version => int32
>> > > > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
>> > > > > > > 2  offline_replicas => [int32] (assuming no offline replicas)
>> > > > > >
>> > > > > > Assuming I added that up correctly, the per-partition overhead
>> goes
>> > > > from
>> > > > > > 64 bytes per partition to 68, a 6.2% increase.
>> > > > > >
>> > > > > > We could do similar math for a lot of the other RPCs.  And you
>> will
>> > > > have
>> > > > > a
>> > > > > > similar memory and garbage collection impact on the brokers
>> since
>> > you
>> > > > > have
>> > > > > > to store all this extra state as well.
>> > > > > >
>> > > > >
>> > > > > That is correct. IMO the Metadata is only updated periodically
>> and is
>> > > > > probably not a big deal if we increase it by 6%. The FetchResponse
>> > and
>> > > > > ProduceRequest are probably the only requests that are bounded by
>> the
>> > > > > bandwidth throughput.
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > > >
>> > > > > > > > I agree that we can probably save more space by using
>> partition
>> > > ID
>> > > > so
>> > > > > > that
>> > > > > > > > we no longer needs the string topic name. The similar idea
>> has
>> > > also
>> > > > > > been
>> > > > > > > > put in the Rejected Alternative section in KIP-227. While
>> this
>> > > idea
>> > > > > is
>> > > > > > > > promising, it seems orthogonal to the goal of this KIP.
>> Given
>> > > that
>> > > > > > there is
>> > > > > > > > already many work to do in this KIP, maybe we can do the
>> > > partition
>> > > > ID
>> > > > > > in a
>> > > > > > > > separate KIP?
>> > > > > >
>> > > > > > I guess my thinking is that the goal here is to replace an
>> > identifier
>> > > > > > which can be re-used (the tuple of topic name, partition ID)
>> with
>> > an
>> > > > > > identifier that cannot be re-used (the tuple of topic name,
>> > partition
>> > > > ID,
>> > > > > > partition epoch) in order to gain better semantics.  As long as
>> we
>> > > are
>> > > > > > replacing the identifier, why not replace it with an identifier
>> > that
>> > > > has
>> > > > > > important performance advantages?  The KIP freeze for the next
>> > > release
>> > > > > has
>> > > > > > already passed, so there is time to do this.
>> > > > > >
>> > > > >
>> > > > > In general it can be easier for discussion and implementation if
>> we
>> > can
>> > > > > split a larger task into smaller and independent tasks. For
>> example,
>> > > > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31,
>> KIP-32
>> > > and
>> > > > > KIP-33 are about timestamp support. The option on this can be
>> subject
>> > > > > though.
>> > > > >
>> > > > > IMO the change to switch from (topic, partition ID) to
>> partitionEpch
>> > in
>> > > > all
>> > > > > request/response requires us to going through all request one by
>> one.
>> > > It
>> > > > > may not be hard but it can be time consuming and tedious. At high
>> > level
>> > > > the
>> > > > > goal and the change for that will be orthogonal to the changes
>> > required
>> > > > in
>> > > > > this KIP. That is the main reason I think we can split them into
>> two
>> > > > KIPs.
>> > > > >
>> > > > >
>> > > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
>> > > > > > > I think it is possible to move to entirely use partitionEpoch
>> > > instead
>> > > > > of
>> > > > > > > (topic, partition) to identify a partition. Client can obtain
>> the
>> > > > > > > partitionEpoch -> (topic, partition) mapping from
>> > MetadataResponse.
>> > > > We
>> > > > > > > probably need to figure out a way to assign partitionEpoch to
>> > > > existing
>> > > > > > > partitions in the cluster. But this should be doable.
>> > > > > > >
>> > > > > > > This is a good idea. I think it will save us some space in the
>> > > > > > > request/response. The actual space saving in percentage
>> probably
>> > > > > depends
>> > > > > > on
>> > > > > > > the amount of data and the number of partitions of the same
>> > topic.
>> > > I
>> > > > > just
>> > > > > > > think we can do it in a separate KIP.
>> > > > > >
>> > > > > > Hmm.  How much extra work would be required?  It seems like we
>> are
>> > > > > already
>> > > > > > changing almost every RPC that involves topics and partitions,
>> > > already
>> > > > > > adding new per-partition state to ZooKeeper, already changing
>> how
>> > > > clients
>> > > > > > interact with partitions.  Is there some other big piece of work
>> > we'd
>> > > > > have
>> > > > > > to do to move to partition IDs that we wouldn't need for
>> partition
>> > > > > epochs?
>> > > > > > I guess we'd have to find a way to support regular
>> expression-based
>> > > > topic
>> > > > > > subscriptions.  If we split this into multiple KIPs, wouldn't we
>> > end
>> > > up
>> > > > > > changing all that RPCs and ZK state a second time?  Also, I'm
>> > curious
>> > > > if
>> > > > > > anyone has done any proof of concept GC, memory, and network
>> usage
>> > > > > > measurements on switching topic names for topic IDs.
>> > > > > >
>> > > > >
>> > > > >
>> > > > > We will need to go over all requests/responses to check how to
>> > replace
>> > > > > (topic, partition ID) with partition epoch. It requires
>> non-trivial
>> > > work
>> > > > > and could take time. As you mentioned, we may want to see how much
>> > > saving
>> > > > > we can get by switching from topic names to partition epoch. That
>> > > itself
>> > > > > requires time and experiment. It seems that the new idea does not
>> > > > rollback
>> > > > > any change proposed in this KIP. So I am not sure we can get much
>> by
>> > > > > putting them into the same KIP.
>> > > > >
>> > > > > Anyway, if more people are interested in seeing the new idea in
>> the
>> > > same
>> > > > > KIP, I can try that.
>> > > > >
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > best,
>> > > > > > Colin
>> > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
>> > > cmcc...@apache.org
>> > > > >
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
>> > > > > > > >> > Hey Colin,
>> > > > > > > >> >
>> > > > > > > >> >
>> > > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
>> > > > > cmcc...@apache.org>
>> > > > > > > >> wrote:
>> > > > > > > >> >
>> > > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
>> > > > > > > >> > > > Hey Colin,
>> > > > > > > >> > > >
>> > > > > > > >> > > > Thanks for the comment.
>> > > > > > > >> > > >
>> > > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
>> > > > > > cmcc...@apache.org>
>> > > > > > > >> > > wrote:
>> > > > > > > >> > > >
>> > > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
>> > > > > > > >> > > > > > Hey Colin,
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Thanks for reviewing the KIP.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > If I understand you right, you maybe suggesting
>> that
>> > > we
>> > > > > can
>> > > > > > use
>> > > > > > > >> a
>> > > > > > > >> > > global
>> > > > > > > >> > > > > > metadataEpoch that is incremented every time
>> > > controller
>> > > > > > updates
>> > > > > > > >> > > metadata.
>> > > > > > > >> > > > > > The problem with this solution is that, if a
>> topic
>> > is
>> > > > > > deleted
>> > > > > > > >> and
>> > > > > > > >> > > created
>> > > > > > > >> > > > > > again, user will not know whether that the offset
>> > > which
>> > > > is
>> > > > > > > >> stored
>> > > > > > > >> > > before
>> > > > > > > >> > > > > > the topic deletion is no longer valid. This
>> > motivates
>> > > > the
>> > > > > > idea
>> > > > > > > >> to
>> > > > > > > >> > > include
>> > > > > > > >> > > > > > per-partition partitionEpoch. Does this sound
>> > > > reasonable?
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Hi Dong,
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Perhaps we can store the last valid offset of each
>> > > deleted
>> > > > > > topic
>> > > > > > > >> in
>> > > > > > > >> > > > > ZooKeeper.  Then, when a topic with one of those
>> names
>> > > > gets
>> > > > > > > >> > > re-created, we
>> > > > > > > >> > > > > can start the topic at the previous end offset
>> rather
>> > > than
>> > > > > at
>> > > > > > 0.
>> > > > > > > >> This
>> > > > > > > >> > > > > preserves immutability.  It is no more burdensome
>> than
>> > > > > having
>> > > > > > to
>> > > > > > > >> > > preserve a
>> > > > > > > >> > > > > "last epoch" for the deleted partition somewhere,
>> > right?
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > My concern with this solution is that the number of
>> > > > zookeeper
>> > > > > > nodes
>> > > > > > > >> get
>> > > > > > > >> > > > more and more over time if some users keep deleting
>> and
>> > > > > creating
>> > > > > > > >> topics.
>> > > > > > > >> > > Do
>> > > > > > > >> > > > you think this can be a problem?
>> > > > > > > >> > >
>> > > > > > > >> > > Hi Dong,
>> > > > > > > >> > >
>> > > > > > > >> > > We could expire the "partition tombstones" after an
>> hour
>> > or
>> > > > so.
>> > > > > > In
>> > > > > > > >> > > practice this would solve the issue for clients that
>> like
>> > to
>> > > > > > destroy
>> > > > > > > >> and
>> > > > > > > >> > > re-create topics all the time.  In any case, doesn't
>> the
>> > > > current
>> > > > > > > >> proposal
>> > > > > > > >> > > add per-partition znodes as well that we have to track
>> > even
>> > > > > after
>> > > > > > the
>> > > > > > > >> > > partition is deleted?  Or did I misunderstand that?
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >> > Actually the current KIP does not add per-partition
>> znodes.
>> > > > Could
>> > > > > > you
>> > > > > > > >> > double check? I can fix the KIP wiki if there is anything
>> > > > > > misleading.
>> > > > > > > >>
>> > > > > > > >> Hi Dong,
>> > > > > > > >>
>> > > > > > > >> I double-checked the KIP, and I can see that you are in
>> fact
>> > > > using a
>> > > > > > > >> global counter for initializing partition epochs.  So, you
>> are
>> > > > > > correct, it
>> > > > > > > >> doesn't add per-partition znodes for partitions that no
>> longer
>> > > > > exist.
>> > > > > > > >>
>> > > > > > > >> >
>> > > > > > > >> > If we expire the "partition tomstones" after an hour, and
>> > the
>> > > > > topic
>> > > > > > is
>> > > > > > > >> > re-created after more than an hour since the topic
>> deletion,
>> > > > then
>> > > > > > we are
>> > > > > > > >> > back to the situation where user can not tell whether the
>> > > topic
>> > > > > has
>> > > > > > been
>> > > > > > > >> > re-created or not, right?
>> > > > > > > >>
>> > > > > > > >> Yes, with an expiration period, it would not ensure
>> > > immutability--
>> > > > > you
>> > > > > > > >> could effectively reuse partition names and they would look
>> > the
>> > > > > same.
>> > > > > > > >>
>> > > > > > > >> >
>> > > > > > > >> >
>> > > > > > > >> > >
>> > > > > > > >> > > It's not really clear to me what should happen when a
>> > topic
>> > > is
>> > > > > > > >> destroyed
>> > > > > > > >> > > and re-created with new data.  Should consumers
>> continue
>> > to
>> > > be
>> > > > > > able to
>> > > > > > > >> > > consume?  We don't know where they stopped consuming
>> from
>> > > the
>> > > > > > previous
>> > > > > > > >> > > incarnation of the topic, so messages may have been
>> lost.
>> > > > > > Certainly
>> > > > > > > >> > > consuming data from offset X of the new incarnation of
>> the
>> > > > topic
>> > > > > > may
>> > > > > > > >> give
>> > > > > > > >> > > something totally different from what you would have
>> > gotten
>> > > > from
>> > > > > > > >> offset X
>> > > > > > > >> > > of the previous incarnation of the topic.
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >> > With the current KIP, if a consumer consumes a topic
>> based
>> > on
>> > > > the
>> > > > > > last
>> > > > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and if
>> the
>> > > > topic
>> > > > > > is
>> > > > > > > >> > re-created, consume will throw
>> > InvalidPartitionEpochException
>> > > > > > because
>> > > > > > > >> the
>> > > > > > > >> > previous partitionEpoch will be different from the
>> current
>> > > > > > > >> partitionEpoch.
>> > > > > > > >> > This is described in the Proposed Changes -> Consumption
>> > after
>> > > > > topic
>> > > > > > > >> > deletion in the KIP. I can improve the KIP if there is
>> > > anything
>> > > > > not
>> > > > > > > >> clear.
>> > > > > > > >>
>> > > > > > > >> Thanks for the clarification.  It sounds like what you
>> really
>> > > want
>> > > > > is
>> > > > > > > >> immutability-- i.e., to never "really" reuse partition
>> > > > identifiers.
>> > > > > > And
>> > > > > > > >> you do this by making the partition name no longer the
>> "real"
>> > > > > > identifier.
>> > > > > > > >>
>> > > > > > > >> My big concern about this KIP is that it seems like an
>> > > > > > anti-scalability
>> > > > > > > >> feature.  Now we are adding 4 extra bytes for every
>> partition
>> > in
>> > > > the
>> > > > > > > >> FetchResponse and Request, for example.  That could be 40
>> kb
>> > per
>> > > > > > request,
>> > > > > > > >> if the user has 10,000 partitions.  And of course, the KIP
>> > also
>> > > > > makes
>> > > > > > > >> massive changes to UpdateMetadataRequest, MetadataResponse,
>> > > > > > > >> OffsetCommitRequest, OffsetFetchResponse,
>> LeaderAndIsrRequest,
>> > > > > > > >> ListOffsetResponse, etc. which will also increase their
>> size
>> > on
>> > > > the
>> > > > > > wire
>> > > > > > > >> and in memory.
>> > > > > > > >>
>> > > > > > > >> One thing that we talked a lot about in the past is
>> replacing
>> > > > > > partition
>> > > > > > > >> names with IDs.  IDs have a lot of really nice features.
>> They
>> > > > take
>> > > > > > up much
>> > > > > > > >> less space in memory than strings (especially 2-byte Java
>> > > > strings).
>> > > > > > They
>> > > > > > > >> can often be allocated on the stack rather than the heap
>> > > > (important
>> > > > > > when
>> > > > > > > >> you are dealing with hundreds of thousands of them).  They
>> can
>> > > be
>> > > > > > > >> efficiently deserialized and serialized.  If we use 64-bit
>> > ones,
>> > > > we
>> > > > > > will
>> > > > > > > >> never run out of IDs, which means that they can always be
>> > unique
>> > > > per
>> > > > > > > >> partition.
>> > > > > > > >>
>> > > > > > > >> Given that the partition name is no longer the "real"
>> > identifier
>> > > > for
>> > > > > > > >> partitions in the current KIP-232 proposal, why not just
>> move
>> > to
>> > > > > using
>> > > > > > > >> partition IDs entirely instead of strings?  You have to
>> change
>> > > all
>> > > > > the
>> > > > > > > >> messages anyway.  There isn't much point any more to
>> carrying
>> > > > around
>> > > > > > the
>> > > > > > > >> partition name in every RPC, since you really need (name,
>> > epoch)
>> > > > to
>> > > > > > > >> identify the partition.
>> > > > > > > >> Probably the metadata response and a few other messages
>> would
>> > > have
>> > > > > to
>> > > > > > > >> still carry the partition name, to allow clients to go from
>> > name
>> > > > to
>> > > > > > id.
>> > > > > > > >> But we could mostly forget about the strings.  And then
>> this
>> > > would
>> > > > > be
>> > > > > > a
>> > > > > > > >> scalability improvement rather than a scalability problem.
>> > > > > > > >>
>> > > > > > > >> >
>> > > > > > > >> >
>> > > > > > > >> > > By choosing to reuse the same (topic, partition,
>> offset)
>> > > > > 3-tuple,
>> > > > > > we
>> > > > > > > >> have
>> > > > > > > >> >
>> > > > > > > >> > chosen to give up immutability.  That was a really bad
>> > > decision.
>> > > > > > And
>> > > > > > > >> now
>> > > > > > > >> > > we have to worry about time dependencies, stale cached
>> > data,
>> > > > and
>> > > > > > all
>> > > > > > > >> the
>> > > > > > > >> > > rest.  We can't completely fix this inside Kafka no
>> matter
>> > > > what
>> > > > > > we do,
>> > > > > > > >> > > because not all that cached data is inside Kafka
>> itself.
>> > > Some
>> > > > > of
>> > > > > > it
>> > > > > > > >> may be
>> > > > > > > >> > > in systems that Kafka has sent data to, such as other
>> > > daemons,
>> > > > > SQL
>> > > > > > > >> > > databases, streams, and so forth.
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >> > The current KIP will uniquely identify a message using
>> > (topic,
>> > > > > > > >> partition,
>> > > > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the
>> message
>> > > > > > immutability
>> > > > > > > >> > issue that you mentioned. Is there any corner case where
>> the
>> > > > > message
>> > > > > > > >> > immutability is still not preserved with the current KIP?
>> > > > > > > >> >
>> > > > > > > >> >
>> > > > > > > >> > >
>> > > > > > > >> > > I guess the idea here is that mirror maker should work
>> as
>> > > > > expected
>> > > > > > > >> when
>> > > > > > > >> > > users destroy a topic and re-create it with the same
>> name.
>> > > > > That's
>> > > > > > > >> kind of
>> > > > > > > >> > > tough, though, since in that scenario, mirror maker
>> > probably
>> > > > > > should
>> > > > > > > >> destroy
>> > > > > > > >> > > and re-create the topic on the other end, too, right?
>> > > > > Otherwise,
>> > > > > > > >> what you
>> > > > > > > >> > > end up with on the other end could be half of one
>> > > incarnation
>> > > > of
>> > > > > > the
>> > > > > > > >> topic,
>> > > > > > > >> > > and half of another.
>> > > > > > > >> > >
>> > > > > > > >> > > What mirror maker really needs is to be able to follow
>> a
>> > > > stream
>> > > > > of
>> > > > > > > >> events
>> > > > > > > >> > > about the kafka cluster itself.  We could have some
>> master
>> > > > topic
>> > > > > > > >> which is
>> > > > > > > >> > > always present and which contains data about all topic
>> > > > > deletions,
>> > > > > > > >> > > creations, etc.  Then MM can simply follow this topic
>> and
>> > do
>> > > > > what
>> > > > > > is
>> > > > > > > >> needed.
>> > > > > > > >> > >
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Then the next question maybe, should we use a
>> global
>> > > > > > > >> metadataEpoch +
>> > > > > > > >> > > > > > per-partition partitionEpoch, instead of using
>> > > > > per-partition
>> > > > > > > >> > > leaderEpoch
>> > > > > > > >> > > > > +
>> > > > > > > >> > > > > > per-partition leaderEpoch. The former solution
>> using
>> > > > > > > >> metadataEpoch
>> > > > > > > >> > > would
>> > > > > > > >> > > > > > not work due to the following scenario (provided
>> by
>> > > > Jun):
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > "Consider the following scenario. In metadata v1,
>> > the
>> > > > > leader
>> > > > > > > >> for a
>> > > > > > > >> > > > > > partition is at broker 1. In metadata v2, leader
>> is
>> > at
>> > > > > > broker
>> > > > > > > >> 2. In
>> > > > > > > >> > > > > > metadata v3, leader is at broker 1 again. The
>> last
>> > > > > committed
>> > > > > > > >> offset
>> > > > > > > >> > > in
>> > > > > > > >> > > > > v1,
>> > > > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A
>> > consumer
>> > > is
>> > > > > > > >> started and
>> > > > > > > >> > > > > reads
>> > > > > > > >> > > > > > metadata v1 and reads messages from offset 0 to
>> 25
>> > > from
>> > > > > > broker
>> > > > > > > >> 1. My
>> > > > > > > >> > > > > > understanding is that in the current proposal,
>> the
>> > > > > metadata
>> > > > > > > >> version
>> > > > > > > >> > > > > > associated with offset 25 is v1. The consumer is
>> > then
>> > > > > > restarted
>> > > > > > > >> and
>> > > > > > > >> > > > > fetches
>> > > > > > > >> > > > > > metadata v2. The consumer tries to read from
>> broker
>> > 2,
>> > > > > > which is
>> > > > > > > >> the
>> > > > > > > >> > > old
>> > > > > > > >> > > > > > leader with the last offset at 20. In this case,
>> the
>> > > > > > consumer
>> > > > > > > >> will
>> > > > > > > >> > > still
>> > > > > > > >> > > > > > get OffsetOutOfRangeException incorrectly."
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Regarding your comment "For the second purpose,
>> this
>> > > is
>> > > > > > "soft
>> > > > > > > >> state"
>> > > > > > > >> > > > > > anyway.  If the client thinks X is the leader
>> but Y
>> > is
>> > > > > > really
>> > > > > > > >> the
>> > > > > > > >> > > leader,
>> > > > > > > >> > > > > > the client will talk to X, and X will point out
>> its
>> > > > > mistake
>> > > > > > by
>> > > > > > > >> > > sending
>> > > > > > > >> > > > > back
>> > > > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no
>> > true.
>> > > > The
>> > > > > > > >> problem
>> > > > > > > >> > > here is
>> > > > > > > >> > > > > > that the old leader X may still think it is the
>> > leader
>> > > > of
>> > > > > > the
>> > > > > > > >> > > partition
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > > thus it will not send back
>> NOT_LEADER_FOR_PARTITION.
>> > > The
>> > > > > > reason
>> > > > > > > >> is
>> > > > > > > >> > > > > provided
>> > > > > > > >> > > > > > in KAFKA-6262. Can you check if that makes sense?
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > This is solvable with a timeout, right?  If the
>> leader
>> > > > can't
>> > > > > > > >> > > communicate
>> > > > > > > >> > > > > with the controller for a certain period of time,
>> it
>> > > > should
>> > > > > > stop
>> > > > > > > >> > > acting as
>> > > > > > > >> > > > > the leader.  We have to solve this problem,
>> anyway, in
>> > > > order
>> > > > > > to
>> > > > > > > >> fix
>> > > > > > > >> > > all the
>> > > > > > > >> > > > > corner cases.
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > Not sure if I fully understand your proposal. The
>> > proposal
>> > > > > > seems to
>> > > > > > > >> > > require
>> > > > > > > >> > > > non-trivial changes to our existing leadership
>> election
>> > > > > > mechanism.
>> > > > > > > >> Could
>> > > > > > > >> > > > you provide more detail regarding how it works? For
>> > > example,
>> > > > > how
>> > > > > > > >> should
>> > > > > > > >> > > > user choose this timeout, how leader determines
>> whether
>> > it
>> > > > can
>> > > > > > still
>> > > > > > > >> > > > communicate with controller, and how this triggers
>> > > > controller
>> > > > > to
>> > > > > > > >> elect
>> > > > > > > >> > > new
>> > > > > > > >> > > > leader?
>> > > > > > > >> > >
>> > > > > > > >> > > Before I come up with any proposal, let me make sure I
>> > > > > understand
>> > > > > > the
>> > > > > > > >> > > problem correctly.  My big question was, what prevents
>> > > > > split-brain
>> > > > > > > >> here?
>> > > > > > > >> > >
>> > > > > > > >> > > Let's say I have a partition which is on nodes A, B,
>> and
>> > C,
>> > > > with
>> > > > > > > >> min-ISR
>> > > > > > > >> > > 2.  The controller is D.  At some point, there is a
>> > network
>> > > > > > partition
>> > > > > > > >> > > between A and B and the rest of the cluster.  The
>> > Controller
>> > > > > > > >> re-assigns the
>> > > > > > > >> > > partition to nodes C, D, and E.  But A and B keep
>> chugging
>> > > > away,
>> > > > > > even
>> > > > > > > >> > > though they can no longer communicate with the
>> controller.
>> > > > > > > >> > >
>> > > > > > > >> > > At some point, a client with stale metadata writes to
>> the
>> > > > > > partition.
>> > > > > > > >> It
>> > > > > > > >> > > still thinks the partition is on node A, B, and C, so
>> > that's
>> > > > > > where it
>> > > > > > > >> sends
>> > > > > > > >> > > the data.  It's unable to talk to C, but A and B reply
>> > back
>> > > > that
>> > > > > > all
>> > > > > > > >> is
>> > > > > > > >> > > well.
>> > > > > > > >> > >
>> > > > > > > >> > > Is this not a case where we could lose data due to
>> split
>> > > > brain?
>> > > > > > Or is
>> > > > > > > >> > > there a mechanism for preventing this that I missed?
>> If
>> > it
>> > > > is,
>> > > > > it
>> > > > > > > >> seems
>> > > > > > > >> > > like a pretty serious failure case that we should be
>> > > handling
>> > > > > > with our
>> > > > > > > >> > > metadata rework.  And I think epoch numbers and
>> timeouts
>> > > might
>> > > > > be
>> > > > > > > >> part of
>> > > > > > > >> > > the solution.
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2.
>> > However, I
>> > > > am
>> > > > > > not
>> > > > > > > >> sure
>> > > > > > > >> > it is a pretty serious issue which we need to address
>> today.
>> > > > This
>> > > > > > can be
>> > > > > > > >> > prevented by configuring the Kafka topic so that minIsr >
>> > > RF/2.
>> > > > > > > >> Actually,
>> > > > > > > >> > if user sets minIsr=2, is there anything reason that user
>> > > wants
>> > > > to
>> > > > > > set
>> > > > > > > >> RF=4
>> > > > > > > >> > instead of 4?
>> > > > > > > >> >
>> > > > > > > >> > Introducing timeout in leader election mechanism is
>> > > > non-trivial. I
>> > > > > > > >> think we
>> > > > > > > >> > probably want to do that only if there is good use-case
>> that
>> > > can
>> > > > > not
>> > > > > > > >> > otherwise be addressed with the current mechanism.
>> > > > > > > >>
>> > > > > > > >> I still would like to think about these corner cases more.
>> > But
>> > > > > > perhaps
>> > > > > > > >> it's not directly related to this KIP.
>> > > > > > > >>
>> > > > > > > >> regards,
>> > > > > > > >> Colin
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> >
>> > > > > > > >> >
>> > > > > > > >> > > best,
>> > > > > > > >> > > Colin
>> > > > > > > >> > >
>> > > > > > > >> > >
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > > best,
>> > > > > > > >> > > > > Colin
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Regards,
>> > > > > > > >> > > > > > Dong
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <
>> > > > > > > >> cmcc...@apache.org>
>> > > > > > > >> > > > > wrote:
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > > Hi Dong,
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Thanks for proposing this KIP.  I think a
>> metadata
>> > > > epoch
>> > > > > > is a
>> > > > > > > >> > > really
>> > > > > > > >> > > > > good
>> > > > > > > >> > > > > > > idea.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > I read through the DISCUSS thread, but I still
>> > don't
>> > > > > have
>> > > > > > a
>> > > > > > > >> clear
>> > > > > > > >> > > > > picture
>> > > > > > > >> > > > > > > of why the proposal uses a metadata epoch per
>> > > > partition
>> > > > > > rather
>> > > > > > > >> > > than a
>> > > > > > > >> > > > > > > global metadata epoch.  A metadata epoch per
>> > > partition
>> > > > > is
>> > > > > > > >> kind of
>> > > > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes per
>> > > partition
>> > > > > > that we
>> > > > > > > >> > > have to
>> > > > > > > >> > > > > send
>> > > > > > > >> > > > > > > over the wire in every full metadata request,
>> > which
>> > > > > could
>> > > > > > > >> become
>> > > > > > > >> > > extra
>> > > > > > > >> > > > > > > kilobytes on the wire when the number of
>> > partitions
>> > > > > > becomes
>> > > > > > > >> large.
>> > > > > > > >> > > > > Plus,
>> > > > > > > >> > > > > > > we have to update all the auxillary classes to
>> > > include
>> > > > > an
>> > > > > > > >> epoch.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > We need to have a global metadata epoch anyway
>> to
>> > > > handle
>> > > > > > > >> partition
>> > > > > > > >> > > > > > > addition and deletion.  For example, if I give
>> you
>> > > > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1}
>> > and
>> > > > > > {part1,
>> > > > > > > >> > > epoch1},
>> > > > > > > >> > > > > which
>> > > > > > > >> > > > > > > MetadataResponse is newer?  You have no way of
>> > > > knowing.
>> > > > > > It
>> > > > > > > >> could
>> > > > > > > >> > > be
>> > > > > > > >> > > > > that
>> > > > > > > >> > > > > > > part2 has just been created, and the response
>> > with 2
>> > > > > > > >> partitions is
>> > > > > > > >> > > > > newer.
>> > > > > > > >> > > > > > > Or it coudl be that part2 has just been
>> deleted,
>> > and
>> > > > > > > >> therefore the
>> > > > > > > >> > > > > response
>> > > > > > > >> > > > > > > with 1 partition is newer.  You must have a
>> global
>> > > > epoch
>> > > > > > to
>> > > > > > > >> > > > > disambiguate
>> > > > > > > >> > > > > > > these two cases.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Previously, I worked on the Ceph distributed
>> > > > filesystem.
>> > > > > > > >> Ceph had
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > > concept of a map of the whole cluster,
>> maintained
>> > > by a
>> > > > > few
>> > > > > > > >> servers
>> > > > > > > >> > > > > doing
>> > > > > > > >> > > > > > > paxos.  This map was versioned by a single
>> 64-bit
>> > > > epoch
>> > > > > > number
>> > > > > > > >> > > which
>> > > > > > > >> > > > > > > increased on every change.  It was propagated
>> to
>> > > > clients
>> > > > > > > >> through
>> > > > > > > >> > > > > gossip.  I
>> > > > > > > >> > > > > > > wonder if something similar could work here?
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > It seems like the the Kafka MetadataResponse
>> > serves
>> > > > two
>> > > > > > > >> somewhat
>> > > > > > > >> > > > > unrelated
>> > > > > > > >> > > > > > > purposes.  Firstly, it lets clients know what
>> > > > partitions
>> > > > > > > >> exist in
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > > system and where they live.  Secondly, it lets
>> > > clients
>> > > > > > know
>> > > > > > > >> which
>> > > > > > > >> > > nodes
>> > > > > > > >> > > > > > > within the partition are in-sync (in the ISR)
>> and
>> > > > which
>> > > > > > node
>> > > > > > > >> is the
>> > > > > > > >> > > > > leader.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > The first purpose is what you really need a
>> > metadata
>> > > > > epoch
>> > > > > > > >> for, I
>> > > > > > > >> > > > > think.
>> > > > > > > >> > > > > > > You want to know whether a partition exists or
>> > not,
>> > > or
>> > > > > you
>> > > > > > > >> want to
>> > > > > > > >> > > know
>> > > > > > > >> > > > > > > which nodes you should talk to in order to
>> write
>> > to
>> > > a
>> > > > > > given
>> > > > > > > >> > > > > partition.  A
>> > > > > > > >> > > > > > > single metadata epoch for the whole response
>> > should
>> > > be
>> > > > > > > >> adequate
>> > > > > > > >> > > here.
>> > > > > > > >> > > > > We
>> > > > > > > >> > > > > > > should not change the partition assignment
>> without
>> > > > going
>> > > > > > > >> through
>> > > > > > > >> > > > > zookeeper
>> > > > > > > >> > > > > > > (or a similar system), and this inherently
>> > > serializes
>> > > > > > updates
>> > > > > > > >> into
>> > > > > > > >> > > a
>> > > > > > > >> > > > > > > numbered stream.  Brokers should also stop
>> > > responding
>> > > > to
>> > > > > > > >> requests
>> > > > > > > >> > > when
>> > > > > > > >> > > > > they
>> > > > > > > >> > > > > > > are unable to contact ZK for a certain time
>> > period.
>> > > > > This
>> > > > > > > >> prevents
>> > > > > > > >> > > the
>> > > > > > > >> > > > > case
>> > > > > > > >> > > > > > > where a given partition has been moved off some
>> > set
>> > > of
>> > > > > > nodes,
>> > > > > > > >> but a
>> > > > > > > >> > > > > client
>> > > > > > > >> > > > > > > still ends up talking to those nodes and
>> writing
>> > > data
>> > > > > > there.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > For the second purpose, this is "soft state"
>> > anyway.
>> > > > If
>> > > > > > the
>> > > > > > > >> client
>> > > > > > > >> > > > > thinks
>> > > > > > > >> > > > > > > X is the leader but Y is really the leader, the
>> > > client
>> > > > > > will
>> > > > > > > >> talk
>> > > > > > > >> > > to X,
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > > > X will point out its mistake by sending back a
>> > > > > > > >> > > > > NOT_LEADER_FOR_PARTITION.
>> > > > > > > >> > > > > > > Then the client can update its metadata again
>> and
>> > > find
>> > > > > > the new
>> > > > > > > >> > > leader,
>> > > > > > > >> > > > > if
>> > > > > > > >> > > > > > > there is one.  There is no need for an epoch to
>> > > handle
>> > > > > > this.
>> > > > > > > >> > > > > Similarly, I
>> > > > > > > >> > > > > > > can't think of a reason why changing the
>> in-sync
>> > > > replica
>> > > > > > set
>> > > > > > > >> needs
>> > > > > > > >> > > to
>> > > > > > > >> > > > > bump
>> > > > > > > >> > > > > > > the epoch.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > best,
>> > > > > > > >> > > > > > > Colin
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
>> > > > > > > >> > > > > > > > Thanks much for reviewing the KIP!
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > Dong
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang
>> Wang <
>> > > > > > > >> > > wangg...@gmail.com>
>> > > > > > > >> > > > > > > wrote:
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just
>> making
>> > > sure
>> > > > we
>> > > > > > > >> understand
>> > > > > > > >> > > > > all the
>> > > > > > > >> > > > > > > > > scenarios and what to expect.
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > I agree that if, more generally speaking,
>> say
>> > > > users
>> > > > > > have
>> > > > > > > >> only
>> > > > > > > >> > > > > consumed
>> > > > > > > >> > > > > > > to
>> > > > > > > >> > > > > > > > > offset 8, and then call seek(16) to "jump"
>> to
>> > a
>> > > > > > further
>> > > > > > > >> > > position,
>> > > > > > > >> > > > > then
>> > > > > > > >> > > > > > > she
>> > > > > > > >> > > > > > > > > needs to be aware that OORE maybe thrown
>> and
>> > she
>> > > > > > needs to
>> > > > > > > >> > > handle
>> > > > > > > >> > > > > it or
>> > > > > > > >> > > > > > > rely
>> > > > > > > >> > > > > > > > > on reset policy which should not surprise
>> her.
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > I'm +1 on the KIP.
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > Guozhang
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin
>> <
>> > > > > > > >> > > lindon...@gmail.com>
>> > > > > > > >> > > > > > > wrote:
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > > Yes, in general we can not prevent
>> > > > > > > >> OffsetOutOfRangeException
>> > > > > > > >> > > if
>> > > > > > > >> > > > > user
>> > > > > > > >> > > > > > > > > seeks
>> > > > > > > >> > > > > > > > > > to a wrong offset. The main goal is to
>> > prevent
>> > > > > > > >> > > > > > > OffsetOutOfRangeException
>> > > > > > > >> > > > > > > > > if
>> > > > > > > >> > > > > > > > > > user has done things in the right way,
>> e.g.
>> > > user
>> > > > > > should
>> > > > > > > >> know
>> > > > > > > >> > > that
>> > > > > > > >> > > > > > > there
>> > > > > > > >> > > > > > > > > is
>> > > > > > > >> > > > > > > > > > message with this offset.
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > For example, if user calls seek(..) right
>> > > after
>> > > > > > > >> > > construction, the
>> > > > > > > >> > > > > > > only
>> > > > > > > >> > > > > > > > > > reason I can think of is that user stores
>> > > offset
>> > > > > > > >> externally.
>> > > > > > > >> > > In
>> > > > > > > >> > > > > this
>> > > > > > > >> > > > > > > > > case,
>> > > > > > > >> > > > > > > > > > user currently needs to use the offset
>> which
>> > > is
>> > > > > > obtained
>> > > > > > > >> > > using
>> > > > > > > >> > > > > > > > > position(..)
>> > > > > > > >> > > > > > > > > > from the last run. With this KIP, user
>> needs
>> > > to
>> > > > > get
>> > > > > > the
>> > > > > > > >> > > offset
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > > > offsetEpoch using
>> > positionAndOffsetEpoch(...)
>> > > > and
>> > > > > > stores
>> > > > > > > >> > > these
>> > > > > > > >> > > > > > > > > information
>> > > > > > > >> > > > > > > > > > externally. The next time user starts
>> > > consumer,
>> > > > > > he/she
>> > > > > > > >> needs
>> > > > > > > >> > > to
>> > > > > > > >> > > > > call
>> > > > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right
>> after
>> > > > > > construction.
>> > > > > > > >> > > Then KIP
>> > > > > > > >> > > > > > > should
>> > > > > > > >> > > > > > > > > be
>> > > > > > > >> > > > > > > > > > able to ensure that we don't throw
>> > > > > > > >> OffsetOutOfRangeException
>> > > > > > > >> > > if
>> > > > > > > >> > > > > > > there is
>> > > > > > > >> > > > > > > > > no
>> > > > > > > >> > > > > > > > > > unclean leader election.
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Does this sound OK?
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Regards,
>> > > > > > > >> > > > > > > > > > Dong
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM,
>> Guozhang
>> > > Wang
>> > > > <
>> > > > > > > >> > > > > wangg...@gmail.com>
>> > > > > > > >> > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > "If consumer wants to consume message
>> with
>> > > > > offset
>> > > > > > 16,
>> > > > > > > >> then
>> > > > > > > >> > > > > consumer
>> > > > > > > >> > > > > > > > > must
>> > > > > > > >> > > > > > > > > > > have
>> > > > > > > >> > > > > > > > > > > already fetched message with offset 15"
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > --> this may not be always true right?
>> > What
>> > > if
>> > > > > > > >> consumer
>> > > > > > > >> > > just
>> > > > > > > >> > > > > call
>> > > > > > > >> > > > > > > > > > seek(16)
>> > > > > > > >> > > > > > > > > > > after construction and then poll
>> without
>> > > > > committed
>> > > > > > > >> offset
>> > > > > > > >> > > ever
>> > > > > > > >> > > > > > > stored
>> > > > > > > >> > > > > > > > > > > before? Admittedly it is rare but we do
>> > not
>> > > > > > > >> programmably
>> > > > > > > >> > > > > disallow
>> > > > > > > >> > > > > > > it.
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > Guozhang
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong
>> > Lin <
>> > > > > > > >> > > > > lindon...@gmail.com>
>> > > > > > > >> > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Hey Guozhang,
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP!
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > In the scenario you described, let's
>> > > assume
>> > > > > that
>> > > > > > > >> broker
>> > > > > > > >> > > A has
>> > > > > > > >> > > > > > > > > messages
>> > > > > > > >> > > > > > > > > > > with
>> > > > > > > >> > > > > > > > > > > > offset up to 10, and broker B has
>> > messages
>> > > > > with
>> > > > > > > >> offset
>> > > > > > > >> > > up to
>> > > > > > > >> > > > > 20.
>> > > > > > > >> > > > > > > If
>> > > > > > > >> > > > > > > > > > > > consumer wants to consume message
>> with
>> > > > offset
>> > > > > > 9, it
>> > > > > > > >> will
>> > > > > > > >> > > not
>> > > > > > > >> > > > > > > receive
>> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
>> > > > > > > >> > > > > > > > > > > > from broker A.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > If consumer wants to consume message
>> > with
>> > > > > offset
>> > > > > > > >> 16, then
>> > > > > > > >> > > > > > > consumer
>> > > > > > > >> > > > > > > > > must
>> > > > > > > >> > > > > > > > > > > > have already fetched message with
>> offset
>> > > 15,
>> > > > > > which
>> > > > > > > >> can
>> > > > > > > >> > > only
>> > > > > > > >> > > > > come
>> > > > > > > >> > > > > > > from
>> > > > > > > >> > > > > > > > > > > > broker B. Because consumer will fetch
>> > from
>> > > > > > broker B
>> > > > > > > >> only
>> > > > > > > >> > > if
>> > > > > > > >> > > > > > > > > leaderEpoch
>> > > > > > > >> > > > > > > > > > > >=
>> > > > > > > >> > > > > > > > > > > > 2, then the current consumer
>> leaderEpoch
>> > > can
>> > > > > > not be
>> > > > > > > >> 1
>> > > > > > > >> > > since
>> > > > > > > >> > > > > this
>> > > > > > > >> > > > > > > KIP
>> > > > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we
>> > will
>> > > > not
>> > > > > > have
>> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
>> > > > > > > >> > > > > > > > > > > > in this case.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Does this address your question, or
>> > maybe
>> > > > > there
>> > > > > > is
>> > > > > > > >> more
>> > > > > > > >> > > > > advanced
>> > > > > > > >> > > > > > > > > > scenario
>> > > > > > > >> > > > > > > > > > > > that the KIP does not handle?
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > Dong
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM,
>> > Guozhang
>> > > > > Wang <
>> > > > > > > >> > > > > > > wangg...@gmail.com>
>> > > > > > > >> > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the
>> > wiki
>> > > > and
>> > > > > > it
>> > > > > > > >> lgtm.
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Just a quick question: can we
>> > completely
>> > > > > > > >> eliminate the
>> > > > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with this
>> > > > > approach?
>> > > > > > Say
>> > > > > > > >> if
>> > > > > > > >> > > there
>> > > > > > > >> > > > > is
>> > > > > > > >> > > > > > > > > > > consecutive
>> > > > > > > >> > > > > > > > > > > > > leader changes such that the cached
>> > > > > metadata's
>> > > > > > > >> > > partition
>> > > > > > > >> > > > > epoch
>> > > > > > > >> > > > > > > is
>> > > > > > > >> > > > > > > > > 1,
>> > > > > > > >> > > > > > > > > > > and
>> > > > > > > >> > > > > > > > > > > > > the metadata fetch response returns
>> > > with
>> > > > > > > >> partition
>> > > > > > > >> > > epoch 2
>> > > > > > > >> > > > > > > > > pointing
>> > > > > > > >> > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > > > leader broker A, while the actual
>> > > > up-to-date
>> > > > > > > >> metadata
>> > > > > > > >> > > has
>> > > > > > > >> > > > > > > partition
>> > > > > > > >> > > > > > > > > > > > epoch 3
>> > > > > > > >> > > > > > > > > > > > > whose leader is now broker B, the
>> > > metadata
>> > > > > > > >> refresh will
>> > > > > > > >> > > > > still
>> > > > > > > >> > > > > > > > > succeed
>> > > > > > > >> > > > > > > > > > > and
>> > > > > > > >> > > > > > > > > > > > > the follow-up fetch request may
>> still
>> > > see
>> > > > > > OORE?
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Guozhang
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM,
>> Dong
>> > > Lin
>> > > > <
>> > > > > > > >> > > > > lindon...@gmail.com
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > Hi all,
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > I would like to start the voting
>> > > process
>> > > > > for
>> > > > > > > >> KIP-232:
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/
>> > > > > > > >> > > confluence/display/KAFKA/KIP-
>> > > > > > > >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat
>> > > > > > > >> a+using+leaderEpoch+
>> > > > > > > >> > > > > > > > > > and+partitionEpoch
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a
>> concurrency
>> > > > issue
>> > > > > in
>> > > > > > > >> Kafka
>> > > > > > > >> > > which
>> > > > > > > >> > > > > > > > > currently
>> > > > > > > >> > > > > > > > > > > can
>> > > > > > > >> > > > > > > > > > > > > > cause message loss or message
>> > > > duplication
>> > > > > in
>> > > > > > > >> > > consumer.
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > Regards,
>> > > > > > > >> > > > > > > > > > > > > > Dong
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > > > > -- Guozhang
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > > -- Guozhang
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > --
>> > > > > > > >> > > > > > > > > -- Guozhang
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > >
>> > > > > > > >>
>> > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to