It seems that KIP-320 was accepted. Thus, I am wondering what the status
of this KIP is?

-Matthias

On 7/11/18 10:59 AM, Dong Lin wrote:
> Hey Jun,
> 
> Certainly. We can discuss later after KIP-320 settles.
> 
> Thanks!
> Dong
> 
> 
> On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote:
> 
>> Hi, Dong,
>>
>> Sorry for the late response. Since KIP-320 is covering some of the similar
>> problems described in this KIP, perhaps we can wait until KIP-320 settles
>> and see what's still left uncovered in this KIP.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>>> Hey Jun,
>>>
>>> It seems that we have made considerable progress on the discussion of
>>> KIP-253 since February. Do you think we should continue the discussion
>>> there, or can we continue the voting for this KIP? I am happy to submit
>> the
>>> PR and move forward the progress for this KIP.
>>>
>>> Thanks!
>>> Dong
>>>
>>>
>>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>
>>>> Hey Jun,
>>>>
>>>> Sure, I will come up with a KIP this week. I think there is a way to
>>> allow
>>>> partition expansion to arbitrary number without introducing new
>> concepts
>>>> such as read-only partition or repartition epoch.
>>>>
>>>> Thanks,
>>>> Dong
>>>>
>>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote:
>>>>
>>>>> Hi, Dong,
>>>>>
>>>>> Thanks for the reply. The general idea that you had for adding
>>> partitions
>>>>> is similar to what we had in mind. It would be useful to make this
>> more
>>>>> general, allowing adding an arbitrary number of partitions (instead of
>>>>> just
>>>>> doubling) and potentially removing partitions as well. The following
>> is
>>>>> the
>>>>> high level idea from the discussion with Colin, Jason and Ismael.
>>>>>
>>>>> * To change the number of partitions from X to Y in a topic, the
>>>>> controller
>>>>> marks all existing X partitions as read-only and creates Y new
>>> partitions.
>>>>> The new partitions are writable and are tagged with a higher
>> repartition
>>>>> epoch (RE).
>>>>>
>>>>> * The controller propagates the new metadata to every broker. Once the
>>>>> leader of a partition is marked as read-only, it rejects the produce
>>>>> requests on this partition. The producer will then refresh the
>> metadata
>>>>> and
>>>>> start publishing to the new writable partitions.
>>>>>
>>>>> * The consumers will then be consuming messages in RE order. The
>>> consumer
>>>>> coordinator will only assign partitions in the same RE to consumers.
>>> Only
>>>>> after all messages in an RE are consumed, will partitions in a higher
>> RE
>>>>> be
>>>>> assigned to consumers.
>>>>>
>>>>> As Colin mentioned, if we do the above, we could potentially (1) use a
>>>>> globally unique partition id, or (2) use a globally unique topic id to
>>>>> distinguish recreated partitions due to topic deletion.
>>>>>
>>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and
>> see
>>>>> if
>>>>> there is any overlap with KIP-232. Would you be interested in doing
>>> that?
>>>>> If not, we can do that next week.
>>>>>
>>>>> Jun
>>>>>
>>>>>
>>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>>>>>
>>>>>> Hey Jun,
>>>>>>
>>>>>> Interestingly I am also planning to sketch a KIP to allow partition
>>>>>> expansion for keyed topics after this KIP. Since you are already
>> doing
>>>>>> that, I guess I will just share my high level idea here in case it
>> is
>>>>>> helpful.
>>>>>>
>>>>>> The motivation for the KIP is that we currently lose order guarantee
>>> for
>>>>>> messages with the same key if we expand partitions of keyed topic.
>>>>>>
>>>>>> The solution can probably be built upon the following ideas:
>>>>>>
>>>>>> - Partition number of the keyed topic should always be doubled (or
>>>>>> multiplied by power of 2). Given that we select a partition based on
>>>>>> hash(key) % partitionNum, this should help us ensure that, a message
>>>>>> assigned to an existing partition will not be mapped to another
>>> existing
>>>>>> partition after partition expansion.
>>>>>>
>>>>>> - Producer includes in the ProduceRequest some information that
>> helps
>>>>>> ensure that messages produced ti a partition will monotonically
>>>>> increase in
>>>>>> the partitionNum of the topic. In other words, if broker receives a
>>>>>> ProduceRequest and notices that the producer does not know the
>>> partition
>>>>>> number has increased, broker should reject this request. That
>>>>> "information"
>>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the
>> topic,
>>> or
>>>>>> simply partitionNum of the topic. The benefit of this property is
>> that
>>>>> we
>>>>>> can keep the new logic for in-order message consumption entirely in
>>> how
>>>>>> consumer leader determines the partition -> consumer mapping.
>>>>>>
>>>>>> - When consumer leader determines partition -> consumer mapping,
>>> leader
>>>>>> first reads the start position for each partition using
>>>>> OffsetFetchRequest.
>>>>>> If start position are all non-zero, then assignment can be done in
>> its
>>>>>> current manner. The assumption is that, a message in the new
>> partition
>>>>>> should only be consumed after all messages with the same key
>> produced
>>>>>> before it has been consumed. Since some messages in the new
>> partition
>>>>> has
>>>>>> been consumed, we should not worry about consuming messages
>>>>> out-of-order.
>>>>>> This benefit of this approach is that we can avoid unnecessary
>>> overhead
>>>>> in
>>>>>> the common case.
>>>>>>
>>>>>> - If the consumer leader finds that the start position for some
>>>>> partition
>>>>>> is 0. Say the current partition number is 18 and the partition index
>>> is
>>>>> 12,
>>>>>> then consumer leader should ensure that messages produced to
>> partition
>>>>> 12 -
>>>>>> 18/2 = 3 before the first message of partition 12 is consumed,
>> before
>>> it
>>>>>> assigned partition 12 to any consumer in the consumer group. Since
>> we
>>>>> have
>>>>>> a "information" that is monotonically increasing per partition,
>>> consumer
>>>>>> can read the value of this information from the first message in
>>>>> partition
>>>>>> 12, get the offset corresponding to this value in partition 3,
>> assign
>>>>>> partition except for partition 12 (and probably other new
>> partitions)
>>> to
>>>>>> the existing consumers, waiting for the committed offset to go
>> beyond
>>>>> this
>>>>>> offset for partition 3, and trigger rebalance again so that
>> partition
>>> 3
>>>>> can
>>>>>> be reassigned to some consumer.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Dong
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
>>>>>>
>>>>>>> Hi, Dong,
>>>>>>>
>>>>>>> Thanks for the KIP. It looks good overall. We are working on a
>>>>> separate
>>>>>> KIP
>>>>>>> for adding partitions while preserving the ordering guarantees.
>> That
>>>>> may
>>>>>>> require another flavor of partition epoch. It's not very clear
>>> whether
>>>>>> that
>>>>>>> partition epoch can be merged with the partition epoch in this
>> KIP.
>>>>> So,
>>>>>>> perhaps you can wait on this a bit until we post the other KIP in
>>> the
>>>>>> next
>>>>>>> few days.
>>>>>>>
>>>>>>> Jun
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com>
>>>>> wrote:
>>>>>>>
>>>>>>>> +1 on the KIP.
>>>>>>>>
>>>>>>>> I think the KIP is mainly about adding the capability of
>> tracking
>>>>> the
>>>>>>>> system state change lineage. It does not seem necessary to
>> bundle
>>>>> this
>>>>>>> KIP
>>>>>>>> with replacing the topic partition with partition epoch in
>>>>>> produce/fetch.
>>>>>>>> Replacing topic-partition string with partition epoch is
>>>>> essentially a
>>>>>>>> performance improvement on top of this KIP. That can probably be
>>>>> done
>>>>>>>> separately.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com
>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Colin,
>>>>>>>>>
>>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <
>>>>> cmcc...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <
>>>>> lindon...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>
>>>>>>>>>>>> I understand that the KIP will adds overhead by
>>> introducing
>>>>>>>>>> per-partition
>>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that
>>> does
>>>>>> not
>>>>>>>>> incur
>>>>>>>>>>>> additional overhead. But I don't see a better way now.
>>>>>>>>>>>>
>>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that
>>> much.
>>>>> We
>>>>>>>>> probably
>>>>>>>>>>>> should discuss the percentage increase rather than the
>>>>> absolute
>>>>>>>>> number
>>>>>>>>>>>> increase. Currently after KIP-227, per-partition header
>>> has
>>>>> 23
>>>>>>>> bytes.
>>>>>>>>>> This
>>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is
>> 10KB,
>>>>> the
>>>>>>>>>> percentage
>>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems
>> negligible,
>>>>>> right?
>>>>>>>>>>
>>>>>>>>>> Hi Dong,
>>>>>>>>>>
>>>>>>>>>> Thanks for the response.  I agree that the FetchRequest /
>>>>>>> FetchResponse
>>>>>>>>>> overhead should be OK, now that we have incremental fetch
>>>>> requests
>>>>>>> and
>>>>>>>>>> responses.  However, there are a lot of cases where the
>>>>> percentage
>>>>>>>>> increase
>>>>>>>>>> is much greater.  For example, if a client is doing full
>>>>>>>>> MetadataRequests /
>>>>>>>>>> Responses, we have some math kind of like this per
>> partition:
>>>>>>>>>>
>>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition
>>>>>>>>> controller_epoch
>>>>>>>>>> leader  leader_epoch partition_epoch isr zk_version replicas
>>>>>>>>>> offline_replicas
>>>>>>>>>>> 14 bytes:  topic => string (assuming about 10 byte topic
>>>>> names)
>>>>>>>>>>> 4 bytes:  partition => int32
>>>>>>>>>>> 4  bytes: conroller_epoch => int32
>>>>>>>>>>> 4  bytes: leader => int32
>>>>>>>>>>> 4  bytes: leader_epoch => int32
>>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32        <-- NEW
>>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
>>>>>>>>>>> 4 bytes: zk_version => int32
>>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
>>>>>>>>>>> 2  offline_replicas => [int32] (assuming no offline
>>> replicas)
>>>>>>>>>>
>>>>>>>>>> Assuming I added that up correctly, the per-partition
>> overhead
>>>>> goes
>>>>>>>> from
>>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase.
>>>>>>>>>>
>>>>>>>>>> We could do similar math for a lot of the other RPCs.  And
>> you
>>>>> will
>>>>>>>> have
>>>>>>>>> a
>>>>>>>>>> similar memory and garbage collection impact on the brokers
>>>>> since
>>>>>> you
>>>>>>>>> have
>>>>>>>>>> to store all this extra state as well.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That is correct. IMO the Metadata is only updated periodically
>>>>> and is
>>>>>>>>> probably not a big deal if we increase it by 6%. The
>>> FetchResponse
>>>>>> and
>>>>>>>>> ProduceRequest are probably the only requests that are bounded
>>> by
>>>>> the
>>>>>>>>> bandwidth throughput.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I agree that we can probably save more space by using
>>>>> partition
>>>>>>> ID
>>>>>>>> so
>>>>>>>>>> that
>>>>>>>>>>>> we no longer needs the string topic name. The similar
>> idea
>>>>> has
>>>>>>> also
>>>>>>>>>> been
>>>>>>>>>>>> put in the Rejected Alternative section in KIP-227.
>> While
>>>>> this
>>>>>>> idea
>>>>>>>>> is
>>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP.
>>>>> Given
>>>>>>> that
>>>>>>>>>> there is
>>>>>>>>>>>> already many work to do in this KIP, maybe we can do the
>>>>>>> partition
>>>>>>>> ID
>>>>>>>>>> in a
>>>>>>>>>>>> separate KIP?
>>>>>>>>>>
>>>>>>>>>> I guess my thinking is that the goal here is to replace an
>>>>>> identifier
>>>>>>>>>> which can be re-used (the tuple of topic name, partition ID)
>>>>> with
>>>>>> an
>>>>>>>>>> identifier that cannot be re-used (the tuple of topic name,
>>>>>> partition
>>>>>>>> ID,
>>>>>>>>>> partition epoch) in order to gain better semantics.  As long
>>> as
>>>>> we
>>>>>>> are
>>>>>>>>>> replacing the identifier, why not replace it with an
>>> identifier
>>>>>> that
>>>>>>>> has
>>>>>>>>>> important performance advantages?  The KIP freeze for the
>> next
>>>>>>> release
>>>>>>>>> has
>>>>>>>>>> already passed, so there is time to do this.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In general it can be easier for discussion and implementation
>> if
>>>>> we
>>>>>> can
>>>>>>>>> split a larger task into smaller and independent tasks. For
>>>>> example,
>>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31,
>>>>> KIP-32
>>>>>>> and
>>>>>>>>> KIP-33 are about timestamp support. The option on this can be
>>>>> subject
>>>>>>>>> though.
>>>>>>>>>
>>>>>>>>> IMO the change to switch from (topic, partition ID) to
>>>>> partitionEpch
>>>>>> in
>>>>>>>> all
>>>>>>>>> request/response requires us to going through all request one
>> by
>>>>> one.
>>>>>>> It
>>>>>>>>> may not be hard but it can be time consuming and tedious. At
>>> high
>>>>>> level
>>>>>>>> the
>>>>>>>>> goal and the change for that will be orthogonal to the changes
>>>>>> required
>>>>>>>> in
>>>>>>>>> this KIP. That is the main reason I think we can split them
>> into
>>>>> two
>>>>>>>> KIPs.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
>>>>>>>>>>> I think it is possible to move to entirely use
>>> partitionEpoch
>>>>>>> instead
>>>>>>>>> of
>>>>>>>>>>> (topic, partition) to identify a partition. Client can
>>> obtain
>>>>> the
>>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from
>>>>>> MetadataResponse.
>>>>>>>> We
>>>>>>>>>>> probably need to figure out a way to assign partitionEpoch
>>> to
>>>>>>>> existing
>>>>>>>>>>> partitions in the cluster. But this should be doable.
>>>>>>>>>>>
>>>>>>>>>>> This is a good idea. I think it will save us some space in
>>> the
>>>>>>>>>>> request/response. The actual space saving in percentage
>>>>> probably
>>>>>>>>> depends
>>>>>>>>>> on
>>>>>>>>>>> the amount of data and the number of partitions of the
>> same
>>>>>> topic.
>>>>>>> I
>>>>>>>>> just
>>>>>>>>>>> think we can do it in a separate KIP.
>>>>>>>>>>
>>>>>>>>>> Hmm.  How much extra work would be required?  It seems like
>> we
>>>>> are
>>>>>>>>> already
>>>>>>>>>> changing almost every RPC that involves topics and
>> partitions,
>>>>>>> already
>>>>>>>>>> adding new per-partition state to ZooKeeper, already
>> changing
>>>>> how
>>>>>>>> clients
>>>>>>>>>> interact with partitions.  Is there some other big piece of
>>> work
>>>>>> we'd
>>>>>>>>> have
>>>>>>>>>> to do to move to partition IDs that we wouldn't need for
>>>>> partition
>>>>>>>>> epochs?
>>>>>>>>>> I guess we'd have to find a way to support regular
>>>>> expression-based
>>>>>>>> topic
>>>>>>>>>> subscriptions.  If we split this into multiple KIPs,
>> wouldn't
>>> we
>>>>>> end
>>>>>>> up
>>>>>>>>>> changing all that RPCs and ZK state a second time?  Also,
>> I'm
>>>>>> curious
>>>>>>>> if
>>>>>>>>>> anyone has done any proof of concept GC, memory, and network
>>>>> usage
>>>>>>>>>> measurements on switching topic names for topic IDs.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We will need to go over all requests/responses to check how to
>>>>>> replace
>>>>>>>>> (topic, partition ID) with partition epoch. It requires
>>>>> non-trivial
>>>>>>> work
>>>>>>>>> and could take time. As you mentioned, we may want to see how
>>> much
>>>>>>> saving
>>>>>>>>> we can get by switching from topic names to partition epoch.
>>> That
>>>>>>> itself
>>>>>>>>> requires time and experiment. It seems that the new idea does
>>> not
>>>>>>>> rollback
>>>>>>>>> any change proposed in this KIP. So I am not sure we can get
>>> much
>>>>> by
>>>>>>>>> putting them into the same KIP.
>>>>>>>>>
>>>>>>>>> Anyway, if more people are interested in seeing the new idea
>> in
>>>>> the
>>>>>>> same
>>>>>>>>> KIP, I can try that.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> best,
>>>>>>>>>> Colin
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
>>>>>>> cmcc...@apache.org
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
>>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the comment.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
>>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
>>>>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If I understand you right, you maybe
>> suggesting
>>>>> that
>>>>>>> we
>>>>>>>>> can
>>>>>>>>>> use
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time
>>>>>>> controller
>>>>>>>>>> updates
>>>>>>>>>>>>>>> metadata.
>>>>>>>>>>>>>>>>>> The problem with this solution is that, if a
>>>>> topic
>>>>>> is
>>>>>>>>>> deleted
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>>>>> again, user will not know whether that the
>>> offset
>>>>>>> which
>>>>>>>> is
>>>>>>>>>>>>> stored
>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This
>>>>>> motivates
>>>>>>>> the
>>>>>>>>>> idea
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> include
>>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound
>>>>>>>> reasonable?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of
>>> each
>>>>>>> deleted
>>>>>>>>>> topic
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> ZooKeeper.  Then, when a topic with one of
>> those
>>>>> names
>>>>>>>> gets
>>>>>>>>>>>>>>> re-created, we
>>>>>>>>>>>>>>>>> can start the topic at the previous end offset
>>>>> rather
>>>>>>> than
>>>>>>>>> at
>>>>>>>>>> 0.
>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>> preserves immutability.  It is no more
>> burdensome
>>>>> than
>>>>>>>>> having
>>>>>>>>>> to
>>>>>>>>>>>>>>> preserve a
>>>>>>>>>>>>>>>>> "last epoch" for the deleted partition
>> somewhere,
>>>>>> right?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My concern with this solution is that the number
>> of
>>>>>>>> zookeeper
>>>>>>>>>> nodes
>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>> more and more over time if some users keep
>> deleting
>>>>> and
>>>>>>>>> creating
>>>>>>>>>>>>> topics.
>>>>>>>>>>>>>>> Do
>>>>>>>>>>>>>>>> you think this can be a problem?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We could expire the "partition tombstones" after an
>>>>> hour
>>>>>> or
>>>>>>>> so.
>>>>>>>>>> In
>>>>>>>>>>>>>>> practice this would solve the issue for clients
>> that
>>>>> like
>>>>>> to
>>>>>>>>>> destroy
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> re-create topics all the time.  In any case,
>> doesn't
>>>>> the
>>>>>>>> current
>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>> add per-partition znodes as well that we have to
>>> track
>>>>>> even
>>>>>>>>> after
>>>>>>>>>> the
>>>>>>>>>>>>>>> partition is deleted?  Or did I misunderstand that?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Actually the current KIP does not add per-partition
>>>>> znodes.
>>>>>>>> Could
>>>>>>>>>> you
>>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is
>>> anything
>>>>>>>>>> misleading.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in
>>>>> fact
>>>>>>>> using a
>>>>>>>>>>>>> global counter for initializing partition epochs.  So,
>>> you
>>>>> are
>>>>>>>>>> correct, it
>>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no
>>>>> longer
>>>>>>>>> exist.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour,
>>> and
>>>>>> the
>>>>>>>>> topic
>>>>>>>>>> is
>>>>>>>>>>>>>> re-created after more than an hour since the topic
>>>>> deletion,
>>>>>>>> then
>>>>>>>>>> we are
>>>>>>>>>>>>>> back to the situation where user can not tell whether
>>> the
>>>>>>> topic
>>>>>>>>> has
>>>>>>>>>> been
>>>>>>>>>>>>>> re-created or not, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, with an expiration period, it would not ensure
>>>>>>> immutability--
>>>>>>>>> you
>>>>>>>>>>>>> could effectively reuse partition names and they would
>>> look
>>>>>> the
>>>>>>>>> same.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's not really clear to me what should happen
>> when a
>>>>>> topic
>>>>>>> is
>>>>>>>>>>>>> destroyed
>>>>>>>>>>>>>>> and re-created with new data.  Should consumers
>>>>> continue
>>>>>> to
>>>>>>> be
>>>>>>>>>> able to
>>>>>>>>>>>>>>> consume?  We don't know where they stopped
>> consuming
>>>>> from
>>>>>>> the
>>>>>>>>>> previous
>>>>>>>>>>>>>>> incarnation of the topic, so messages may have been
>>>>> lost.
>>>>>>>>>> Certainly
>>>>>>>>>>>>>>> consuming data from offset X of the new incarnation
>>> of
>>>>> the
>>>>>>>> topic
>>>>>>>>>> may
>>>>>>>>>>>>> give
>>>>>>>>>>>>>>> something totally different from what you would
>> have
>>>>>> gotten
>>>>>>>> from
>>>>>>>>>>>>> offset X
>>>>>>>>>>>>>>> of the previous incarnation of the topic.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic
>>>>> based
>>>>>> on
>>>>>>>> the
>>>>>>>>>> last
>>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and
>>> if
>>>>> the
>>>>>>>> topic
>>>>>>>>>> is
>>>>>>>>>>>>>> re-created, consume will throw
>>>>>> InvalidPartitionEpochException
>>>>>>>>>> because
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> previous partitionEpoch will be different from the
>>>>> current
>>>>>>>>>>>>> partitionEpoch.
>>>>>>>>>>>>>> This is described in the Proposed Changes ->
>>> Consumption
>>>>>> after
>>>>>>>>> topic
>>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there
>> is
>>>>>>> anything
>>>>>>>>> not
>>>>>>>>>>>>> clear.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the clarification.  It sounds like what you
>>>>> really
>>>>>>> want
>>>>>>>>> is
>>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition
>>>>>>>> identifiers.
>>>>>>>>>> And
>>>>>>>>>>>>> you do this by making the partition name no longer the
>>>>> "real"
>>>>>>>>>> identifier.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My big concern about this KIP is that it seems like an
>>>>>>>>>> anti-scalability
>>>>>>>>>>>>> feature.  Now we are adding 4 extra bytes for every
>>>>> partition
>>>>>> in
>>>>>>>> the
>>>>>>>>>>>>> FetchResponse and Request, for example.  That could be
>> 40
>>>>> kb
>>>>>> per
>>>>>>>>>> request,
>>>>>>>>>>>>> if the user has 10,000 partitions.  And of course, the
>>> KIP
>>>>>> also
>>>>>>>>> makes
>>>>>>>>>>>>> massive changes to UpdateMetadataRequest,
>>> MetadataResponse,
>>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse,
>>>>> LeaderAndIsrRequest,
>>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their
>>>>> size
>>>>>> on
>>>>>>>> the
>>>>>>>>>> wire
>>>>>>>>>>>>> and in memory.
>>>>>>>>>>>>>
>>>>>>>>>>>>> One thing that we talked a lot about in the past is
>>>>> replacing
>>>>>>>>>> partition
>>>>>>>>>>>>> names with IDs.  IDs have a lot of really nice
>> features.
>>>>> They
>>>>>>>> take
>>>>>>>>>> up much
>>>>>>>>>>>>> less space in memory than strings (especially 2-byte
>> Java
>>>>>>>> strings).
>>>>>>>>>> They
>>>>>>>>>>>>> can often be allocated on the stack rather than the
>> heap
>>>>>>>> (important
>>>>>>>>>> when
>>>>>>>>>>>>> you are dealing with hundreds of thousands of them).
>>> They
>>>>> can
>>>>>>> be
>>>>>>>>>>>>> efficiently deserialized and serialized.  If we use
>>> 64-bit
>>>>>> ones,
>>>>>>>> we
>>>>>>>>>> will
>>>>>>>>>>>>> never run out of IDs, which means that they can always
>> be
>>>>>> unique
>>>>>>>> per
>>>>>>>>>>>>> partition.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Given that the partition name is no longer the "real"
>>>>>> identifier
>>>>>>>> for
>>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not
>> just
>>>>> move
>>>>>> to
>>>>>>>>> using
>>>>>>>>>>>>> partition IDs entirely instead of strings?  You have to
>>>>> change
>>>>>>> all
>>>>>>>>> the
>>>>>>>>>>>>> messages anyway.  There isn't much point any more to
>>>>> carrying
>>>>>>>> around
>>>>>>>>>> the
>>>>>>>>>>>>> partition name in every RPC, since you really need
>> (name,
>>>>>> epoch)
>>>>>>>> to
>>>>>>>>>>>>> identify the partition.
>>>>>>>>>>>>> Probably the metadata response and a few other messages
>>>>> would
>>>>>>> have
>>>>>>>>> to
>>>>>>>>>>>>> still carry the partition name, to allow clients to go
>>> from
>>>>>> name
>>>>>>>> to
>>>>>>>>>> id.
>>>>>>>>>>>>> But we could mostly forget about the strings.  And then
>>>>> this
>>>>>>> would
>>>>>>>>> be
>>>>>>>>>> a
>>>>>>>>>>>>> scalability improvement rather than a scalability
>>> problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition,
>>>>> offset)
>>>>>>>>> 3-tuple,
>>>>>>>>>> we
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> chosen to give up immutability.  That was a really
>> bad
>>>>>>> decision.
>>>>>>>>>> And
>>>>>>>>>>>>> now
>>>>>>>>>>>>>>> we have to worry about time dependencies, stale
>>> cached
>>>>>> data,
>>>>>>>> and
>>>>>>>>>> all
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> rest.  We can't completely fix this inside Kafka no
>>>>> matter
>>>>>>>> what
>>>>>>>>>> we do,
>>>>>>>>>>>>>>> because not all that cached data is inside Kafka
>>>>> itself.
>>>>>>> Some
>>>>>>>>> of
>>>>>>>>>> it
>>>>>>>>>>>>> may be
>>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as
>> other
>>>>>>> daemons,
>>>>>>>>> SQL
>>>>>>>>>>>>>>> databases, streams, and so forth.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The current KIP will uniquely identify a message
>> using
>>>>>> (topic,
>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the
>>>>> message
>>>>>>>>>> immutability
>>>>>>>>>>>>>> issue that you mentioned. Is there any corner case
>>> where
>>>>> the
>>>>>>>>> message
>>>>>>>>>>>>>> immutability is still not preserved with the current
>>> KIP?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I guess the idea here is that mirror maker should
>>> work
>>>>> as
>>>>>>>>> expected
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>> users destroy a topic and re-create it with the
>> same
>>>>> name.
>>>>>>>>> That's
>>>>>>>>>>>>> kind of
>>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker
>>>>>> probably
>>>>>>>>>> should
>>>>>>>>>>>>> destroy
>>>>>>>>>>>>>>> and re-create the topic on the other end, too,
>> right?
>>>>>>>>> Otherwise,
>>>>>>>>>>>>> what you
>>>>>>>>>>>>>>> end up with on the other end could be half of one
>>>>>>> incarnation
>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>> topic,
>>>>>>>>>>>>>>> and half of another.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What mirror maker really needs is to be able to
>>> follow
>>>>> a
>>>>>>>> stream
>>>>>>>>> of
>>>>>>>>>>>>> events
>>>>>>>>>>>>>>> about the kafka cluster itself.  We could have some
>>>>> master
>>>>>>>> topic
>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>> always present and which contains data about all
>>> topic
>>>>>>>>> deletions,
>>>>>>>>>>>>>>> creations, etc.  Then MM can simply follow this
>> topic
>>>>> and
>>>>>> do
>>>>>>>>> what
>>>>>>>>>> is
>>>>>>>>>>>>> needed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a
>>>>> global
>>>>>>>>>>>>> metadataEpoch +
>>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of
>> using
>>>>>>>>> per-partition
>>>>>>>>>>>>>>> leaderEpoch
>>>>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former
>> solution
>>>>> using
>>>>>>>>>>>>> metadataEpoch
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> not work due to the following scenario
>>> (provided
>>>>> by
>>>>>>>> Jun):
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata
>>> v1,
>>>>>> the
>>>>>>>>> leader
>>>>>>>>>>>>> for a
>>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2,
>>> leader
>>>>> is
>>>>>> at
>>>>>>>>>> broker
>>>>>>>>>>>>> 2. In
>>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The
>>>>> last
>>>>>>>>> committed
>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> v1,
>>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A
>>>>>> consumer
>>>>>>> is
>>>>>>>>>>>>> started and
>>>>>>>>>>>>>>>>> reads
>>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0
>> to
>>>>> 25
>>>>>>> from
>>>>>>>>>> broker
>>>>>>>>>>>>> 1. My
>>>>>>>>>>>>>>>>>> understanding is that in the current
>> proposal,
>>>>> the
>>>>>>>>> metadata
>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer
>>> is
>>>>>> then
>>>>>>>>>> restarted
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> fetches
>>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from
>>>>> broker
>>>>>> 2,
>>>>>>>>>> which is
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this
>>> case,
>>>>> the
>>>>>>>>>> consumer
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly."
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding your comment "For the second
>> purpose,
>>>>> this
>>>>>>> is
>>>>>>>>>> "soft
>>>>>>>>>>>>> state"
>>>>>>>>>>>>>>>>>> anyway.  If the client thinks X is the leader
>>>>> but Y
>>>>>> is
>>>>>>>>>> really
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> leader,
>>>>>>>>>>>>>>>>>> the client will talk to X, and X will point
>> out
>>>>> its
>>>>>>>>> mistake
>>>>>>>>>> by
>>>>>>>>>>>>>>> sending
>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably
>> no
>>>>>> true.
>>>>>>>> The
>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>> here is
>>>>>>>>>>>>>>>>>> that the old leader X may still think it is
>> the
>>>>>> leader
>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> thus it will not send back
>>>>> NOT_LEADER_FOR_PARTITION.
>>>>>>> The
>>>>>>>>>> reason
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes
>>> sense?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is solvable with a timeout, right?  If the
>>>>> leader
>>>>>>>> can't
>>>>>>>>>>>>>>> communicate
>>>>>>>>>>>>>>>>> with the controller for a certain period of
>> time,
>>>>> it
>>>>>>>> should
>>>>>>>>>> stop
>>>>>>>>>>>>>>> acting as
>>>>>>>>>>>>>>>>> the leader.  We have to solve this problem,
>>>>> anyway, in
>>>>>>>> order
>>>>>>>>>> to
>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>> all the
>>>>>>>>>>>>>>>>> corner cases.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The
>>>>>> proposal
>>>>>>>>>> seems to
>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>> non-trivial changes to our existing leadership
>>>>> election
>>>>>>>>>> mechanism.
>>>>>>>>>>>>> Could
>>>>>>>>>>>>>>>> you provide more detail regarding how it works?
>> For
>>>>>>> example,
>>>>>>>>> how
>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> user choose this timeout, how leader determines
>>>>> whether
>>>>>> it
>>>>>>>> can
>>>>>>>>>> still
>>>>>>>>>>>>>>>> communicate with controller, and how this
>> triggers
>>>>>>>> controller
>>>>>>>>> to
>>>>>>>>>>>>> elect
>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> leader?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Before I come up with any proposal, let me make
>> sure
>>> I
>>>>>>>>> understand
>>>>>>>>>> the
>>>>>>>>>>>>>>> problem correctly.  My big question was, what
>>> prevents
>>>>>>>>> split-brain
>>>>>>>>>>>>> here?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A,
>> B,
>>>>> and
>>>>>> C,
>>>>>>>> with
>>>>>>>>>>>>> min-ISR
>>>>>>>>>>>>>>> 2.  The controller is D.  At some point, there is a
>>>>>> network
>>>>>>>>>> partition
>>>>>>>>>>>>>>> between A and B and the rest of the cluster.  The
>>>>>> Controller
>>>>>>>>>>>>> re-assigns the
>>>>>>>>>>>>>>> partition to nodes C, D, and E.  But A and B keep
>>>>> chugging
>>>>>>>> away,
>>>>>>>>>> even
>>>>>>>>>>>>>>> though they can no longer communicate with the
>>>>> controller.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> At some point, a client with stale metadata writes
>> to
>>>>> the
>>>>>>>>>> partition.
>>>>>>>>>>>>> It
>>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C,
>> so
>>>>>> that's
>>>>>>>>>> where it
>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>> the data.  It's unable to talk to C, but A and B
>>> reply
>>>>>> back
>>>>>>>> that
>>>>>>>>>> all
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is this not a case where we could lose data due to
>>>>> split
>>>>>>>> brain?
>>>>>>>>>> Or is
>>>>>>>>>>>>>>> there a mechanism for preventing this that I
>> missed?
>>>>> If
>>>>>> it
>>>>>>>> is,
>>>>>>>>> it
>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>> like a pretty serious failure case that we should
>> be
>>>>>>> handling
>>>>>>>>>> with our
>>>>>>>>>>>>>>> metadata rework.  And I think epoch numbers and
>>>>> timeouts
>>>>>>> might
>>>>>>>>> be
>>>>>>>>>>>>> part of
>>>>>>>>>>>>>>> the solution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2.
>>>>>> However, I
>>>>>>>> am
>>>>>>>>>> not
>>>>>>>>>>>>> sure
>>>>>>>>>>>>>> it is a pretty serious issue which we need to address
>>>>> today.
>>>>>>>> This
>>>>>>>>>> can be
>>>>>>>>>>>>>> prevented by configuring the Kafka topic so that
>>> minIsr >
>>>>>>> RF/2.
>>>>>>>>>>>>> Actually,
>>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that
>>> user
>>>>>>> wants
>>>>>>>> to
>>>>>>>>>> set
>>>>>>>>>>>>> RF=4
>>>>>>>>>>>>>> instead of 4?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Introducing timeout in leader election mechanism is
>>>>>>>> non-trivial. I
>>>>>>>>>>>>> think we
>>>>>>>>>>>>>> probably want to do that only if there is good
>> use-case
>>>>> that
>>>>>>> can
>>>>>>>>> not
>>>>>>>>>>>>>> otherwise be addressed with the current mechanism.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I still would like to think about these corner cases
>>> more.
>>>>>> But
>>>>>>>>>> perhaps
>>>>>>>>>>>>> it's not directly related to this KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> regards,
>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin
>> McCabe
>>> <
>>>>>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP.  I think a
>>>>> metadata
>>>>>>>> epoch
>>>>>>>>>> is a
>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>> idea.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I
>>> still
>>>>>> don't
>>>>>>>>> have
>>>>>>>>>> a
>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>> picture
>>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch
>> per
>>>>>>>> partition
>>>>>>>>>> rather
>>>>>>>>>>>>>>> than a
>>>>>>>>>>>>>>>>>>> global metadata epoch.  A metadata epoch
>> per
>>>>>>> partition
>>>>>>>>> is
>>>>>>>>>>>>> kind of
>>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes
>> per
>>>>>>> partition
>>>>>>>>>> that we
>>>>>>>>>>>>>>> have to
>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>> over the wire in every full metadata
>> request,
>>>>>> which
>>>>>>>>> could
>>>>>>>>>>>>> become
>>>>>>>>>>>>>>> extra
>>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of
>>>>>> partitions
>>>>>>>>>> becomes
>>>>>>>>>>>>> large.
>>>>>>>>>>>>>>>>> Plus,
>>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes
>>> to
>>>>>>> include
>>>>>>>>> an
>>>>>>>>>>>>> epoch.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch
>>> anyway
>>>>> to
>>>>>>>> handle
>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>> addition and deletion.  For example, if I
>>> give
>>>>> you
>>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2,
>> epoch
>>> 1}
>>>>>> and
>>>>>>>>>> {part1,
>>>>>>>>>>>>>>> epoch1},
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>> MetadataResponse is newer?  You have no way
>>> of
>>>>>>>> knowing.
>>>>>>>>>> It
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> part2 has just been created, and the
>> response
>>>>>> with 2
>>>>>>>>>>>>> partitions is
>>>>>>>>>>>>>>>>> newer.
>>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been
>>>>> deleted,
>>>>>> and
>>>>>>>>>>>>> therefore the
>>>>>>>>>>>>>>>>> response
>>>>>>>>>>>>>>>>>>> with 1 partition is newer.  You must have a
>>>>> global
>>>>>>>> epoch
>>>>>>>>>> to
>>>>>>>>>>>>>>>>> disambiguate
>>>>>>>>>>>>>>>>>>> these two cases.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph
>> distributed
>>>>>>>> filesystem.
>>>>>>>>>>>>> Ceph had
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster,
>>>>> maintained
>>>>>>> by a
>>>>>>>>> few
>>>>>>>>>>>>> servers
>>>>>>>>>>>>>>>>> doing
>>>>>>>>>>>>>>>>>>> paxos.  This map was versioned by a single
>>>>> 64-bit
>>>>>>>> epoch
>>>>>>>>>> number
>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>> increased on every change.  It was
>> propagated
>>>>> to
>>>>>>>> clients
>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>> gossip.  I
>>>>>>>>>>>>>>>>>>> wonder if something similar could work
>> here?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It seems like the the Kafka
>> MetadataResponse
>>>>>> serves
>>>>>>>> two
>>>>>>>>>>>>> somewhat
>>>>>>>>>>>>>>>>> unrelated
>>>>>>>>>>>>>>>>>>> purposes.  Firstly, it lets clients know
>> what
>>>>>>>> partitions
>>>>>>>>>>>>> exist in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> system and where they live.  Secondly, it
>>> lets
>>>>>>> clients
>>>>>>>>>> know
>>>>>>>>>>>>> which
>>>>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the
>> ISR)
>>>>> and
>>>>>>>> which
>>>>>>>>>> node
>>>>>>>>>>>>> is the
>>>>>>>>>>>>>>>>> leader.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The first purpose is what you really need a
>>>>>> metadata
>>>>>>>>> epoch
>>>>>>>>>>>>> for, I
>>>>>>>>>>>>>>>>> think.
>>>>>>>>>>>>>>>>>>> You want to know whether a partition exists
>>> or
>>>>>> not,
>>>>>>> or
>>>>>>>>> you
>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>> know
>>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to
>>>>> write
>>>>>> to
>>>>>>> a
>>>>>>>>>> given
>>>>>>>>>>>>>>>>> partition.  A
>>>>>>>>>>>>>>>>>>> single metadata epoch for the whole
>> response
>>>>>> should
>>>>>>> be
>>>>>>>>>>>>> adequate
>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>> should not change the partition assignment
>>>>> without
>>>>>>>> going
>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>> zookeeper
>>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently
>>>>>>> serializes
>>>>>>>>>> updates
>>>>>>>>>>>>> into
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> numbered stream.  Brokers should also stop
>>>>>>> responding
>>>>>>>> to
>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time
>>>>>> period.
>>>>>>>>> This
>>>>>>>>>>>>> prevents
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>> where a given partition has been moved off
>>> some
>>>>>> set
>>>>>>> of
>>>>>>>>>> nodes,
>>>>>>>>>>>>> but a
>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and
>>>>> writing
>>>>>>> data
>>>>>>>>>> there.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft
>> state"
>>>>>> anyway.
>>>>>>>> If
>>>>>>>>>> the
>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>> thinks
>>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader,
>>> the
>>>>>>> client
>>>>>>>>>> will
>>>>>>>>>>>>> talk
>>>>>>>>>>>>>>> to X,
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> X will point out its mistake by sending
>> back
>>> a
>>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION.
>>>>>>>>>>>>>>>>>>> Then the client can update its metadata
>> again
>>>>> and
>>>>>>> find
>>>>>>>>>> the new
>>>>>>>>>>>>>>> leader,
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> there is one.  There is no need for an
>> epoch
>>> to
>>>>>>> handle
>>>>>>>>>> this.
>>>>>>>>>>>>>>>>> Similarly, I
>>>>>>>>>>>>>>>>>>> can't think of a reason why changing the
>>>>> in-sync
>>>>>>>> replica
>>>>>>>>>> set
>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> bump
>>>>>>>>>>>>>>>>>>> the epoch.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin
>>> wrote:
>>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang
>>>>> Wang <
>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just
>>>>> making
>>>>>>> sure
>>>>>>>> we
>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>> all the
>>>>>>>>>>>>>>>>>>>>> scenarios and what to expect.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I agree that if, more generally
>> speaking,
>>>>> say
>>>>>>>> users
>>>>>>>>>> have
>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> consumed
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to
>>> "jump"
>>>>> to
>>>>>> a
>>>>>>>>>> further
>>>>>>>>>>>>>>> position,
>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>> she
>>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe
>> thrown
>>>>> and
>>>>>> she
>>>>>>>>>> needs to
>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>> it or
>>>>>>>>>>>>>>>>>>> rely
>>>>>>>>>>>>>>>>>>>>> on reset policy which should not
>> surprise
>>>>> her.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong
>>> Lin
>>>>> <
>>>>>>>>>>>>>>> lindon...@gmail.com>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent
>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>> seeks
>>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is
>> to
>>>>>> prevent
>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> user has done things in the right
>> way,
>>>>> e.g.
>>>>>>> user
>>>>>>>>>> should
>>>>>>>>>>>>> know
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> message with this offset.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..)
>>> right
>>>>>>> after
>>>>>>>>>>>>>>> construction, the
>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user
>>> stores
>>>>>>> offset
>>>>>>>>>>>>> externally.
>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>>>>>>> user currently needs to use the
>> offset
>>>>> which
>>>>>>> is
>>>>>>>>>> obtained
>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>> position(..)
>>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP,
>> user
>>>>> needs
>>>>>>> to
>>>>>>>>> get
>>>>>>>>>> the
>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> offsetEpoch using
>>>>>> positionAndOffsetEpoch(...)
>>>>>>>> and
>>>>>>>>>> stores
>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts
>>>>>>> consumer,
>>>>>>>>>> he/she
>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>>>> seek(..., offset, offsetEpoch) right
>>>>> after
>>>>>>>>>> construction.
>>>>>>>>>>>>>>> Then KIP
>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> able to ensure that we don't throw
>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>>> unclean leader election.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Does this sound OK?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 11:44 PM,
>>>>> Guozhang
>>>>>>> Wang
>>>>>>>> <
>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> "If consumer wants to consume
>> message
>>>>> with
>>>>>>>>> offset
>>>>>>>>>> 16,
>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>>> must
>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>> already fetched message with offset
>>> 15"
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> --> this may not be always true
>>> right?
>>>>>> What
>>>>>>> if
>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>>>> seek(16)
>>>>>>>>>>>>>>>>>>>>>>> after construction and then poll
>>>>> without
>>>>>>>>> committed
>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>> ever
>>>>>>>>>>>>>>>>>>> stored
>>>>>>>>>>>>>>>>>>>>>>> before? Admittedly it is rare but
>> we
>>> do
>>>>>> not
>>>>>>>>>>>>> programmably
>>>>>>>>>>>>>>>>> disallow
>>>>>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 10:42 PM,
>>> Dong
>>>>>> Lin <
>>>>>>>>>>>>>>>>> lindon...@gmail.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hey Guozhang,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the
>> KIP!
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> In the scenario you described,
>>> let's
>>>>>>> assume
>>>>>>>>> that
>>>>>>>>>>>>> broker
>>>>>>>>>>>>>>> A has
>>>>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> offset up to 10, and broker B has
>>>>>> messages
>>>>>>>>> with
>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>> up to
>>>>>>>>>>>>>>>>> 20.
>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>> consumer wants to consume message
>>>>> with
>>>>>>>> offset
>>>>>>>>>> 9, it
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> receive
>>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>>>>>>>>>>> from broker A.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> If consumer wants to consume
>>> message
>>>>>> with
>>>>>>>>> offset
>>>>>>>>>>>>> 16, then
>>>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>>> must
>>>>>>>>>>>>>>>>>>>>>>>> have already fetched message with
>>>>> offset
>>>>>>> 15,
>>>>>>>>>> which
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>> broker B. Because consumer will
>>> fetch
>>>>>> from
>>>>>>>>>> broker B
>>>>>>>>>>>>> only
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> leaderEpoch
>>>>>>>>>>>>>>>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>>>> 2, then the current consumer
>>>>> leaderEpoch
>>>>>>> can
>>>>>>>>>> not be
>>>>>>>>>>>>> 1
>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>> prevents leaderEpoch rewind. Thus
>>> we
>>>>>> will
>>>>>>>> not
>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>>>>>>>>>>> in this case.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Does this address your question,
>> or
>>>>>> maybe
>>>>>>>>> there
>>>>>>>>>> is
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>> advanced
>>>>>>>>>>>>>>>>>>>>>> scenario
>>>>>>>>>>>>>>>>>>>>>>>> that the KIP does not handle?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 9:43 PM,
>>>>>> Guozhang
>>>>>>>>> Wang <
>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dong, I made a pass over
>>> the
>>>>>> wiki
>>>>>>>> and
>>>>>>>>>> it
>>>>>>>>>>>>> lgtm.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Just a quick question: can we
>>>>>> completely
>>>>>>>>>>>>> eliminate the
>>>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException with
>>> this
>>>>>>>>> approach?
>>>>>>>>>> Say
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> consecutive
>>>>>>>>>>>>>>>>>>>>>>>>> leader changes such that the
>>> cached
>>>>>>>>> metadata's
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> 1,
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> the metadata fetch response
>>> returns
>>>>>>> with
>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>> epoch 2
>>>>>>>>>>>>>>>>>>>>> pointing
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> leader broker A, while the
>> actual
>>>>>>>> up-to-date
>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>>>>>> epoch 3
>>>>>>>>>>>>>>>>>>>>>>>>> whose leader is now broker B,
>> the
>>>>>>> metadata
>>>>>>>>>>>>> refresh will
>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>>>> succeed
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> the follow-up fetch request may
>>>>> still
>>>>>>> see
>>>>>>>>>> OORE?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 3:47
>> PM,
>>>>> Dong
>>>>>>> Lin
>>>>>>>> <
>>>>>>>>>>>>>>>>> lindon...@gmail.com
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to start the
>>> voting
>>>>>>> process
>>>>>>>>> for
>>>>>>>>>>>>> KIP-232:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>>>>>>>>>>>>>>> confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>>
>> 232%3A+Detect+outdated+metadat
>>>>>>>>>>>>> a+using+leaderEpoch+
>>>>>>>>>>>>>>>>>>>>>> and+partitionEpoch
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP will help fix a
>>>>> concurrency
>>>>>>>> issue
>>>>>>>>> in
>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> currently
>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> cause message loss or message
>>>>>>>> duplication
>>>>>>>>> in
>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to