BTW, here is my understanding of the scope of this KIP. We want to allow
consumers to always consume messages with the same key from the same
producer in the order they are produced. And we need to provide a way for
stream use-case to be able to flush/load state when messages with the same
key are migrated between consumers. In addition to ensuring that this goal
is correctly supported, we should do our best to keep the performance and
organization overhead of this KIP as low as possible.


On Tue, Mar 27, 2018 at 1:14 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey John,
>
> Thanks much for the detailed comments. Here are my thoughts:
>
> - The need to delete messages from log compacted topics is mainly for
> performance (e.g. storage space) optimization than for correctness for this
> KIP. I agree that we probably don't need to focus on this in our discussion
> since it is mostly for performance optimization.
>
> - "Asking producers and consumers, or even two different producers, to
> share code like the partition function is a pretty huge ask. What if they
> are using different languages?". It seems that today we already require
> different producer's to use the same hash function -- otherwise messages
> with the same key will go to different partitions of the same topic which
> may cause problem for downstream consumption. So not sure if it adds any
> more constraint by assuming consumers know the hash function of producer.
> Could you explain more why user would want to use a cusmtom partition
> function? Maybe we can check if this is something that can be supported in
> the default Kafka hash function. Also, can you explain more why it is
> difficuilt to implement the same hash function in different languages?
>
> - Besides the assumption that consumer needs to share the hash function of
> producer, is there other organization overhead of the proposal in the
> current KIP?
>
> - Currently producer can forget about the message that has been
> acknowledged by the broker. Thus the producer probably does not know most
> of the exiting messages in topic, including those messages produced by
> other producers. We can have the owner of the producer to split+backfill.
> In my opion it will be a new program that wraps around the existing
> producer and consumer classes.
>
> - Regarding point 5. The argument is in favor of the split+backfill but
> for changelog topic. And it intends to address the problem for stream
> use-case in general. In this KIP we will provide interface (i.e.
> PartitionKeyRebalanceListener in the KIP) to be used by sream use-case and
> the goal is that user can flush/re-consume the state as part of the
> interface implementation regardless of whether there is change log topic.
>
> Maybe you are suggesting that the main reason to do split+backfill of
> input topic is to support log compacted topics? You mentioned in Point 1
> that log compacted topics is out of the scope of this KIP. Maybe I could
> understand your position better. Regarding Jan's proposal to split
> partitions with backfill, do you think this should replace the proposal in
> the existing KIP, or do you think this is something that we should do in
> addition to the existing KIP?
>
> - Regarding point 6. I guess we can agree that it is better not to have
> the performance overhread of copying the input data. Before we discuss more
> on whether the performance overhead is acceptable or not, I am trying to
> figure out what is the benefit of introducing this overhread. You mentioned
> that the benefit is the loose organizational coupling. By "organizational
> coupling", are you referring to the requirement that consumer needs to know
> the hash function of producer? If so, maybe we can discuss the use-case of
> custom partiton function and see whether we can find a way to support such
> use-case without having to copy the input data.
>
> Thanks,
> Dong
>
>
> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io> wrote:
>
>> Hey Dong and Jun,
>>
>> Thanks for the thoughtful responses. If you don't mind, I'll mix my
>> replies
>> together to try for a coherent response. I'm not too familiar with
>> mailing-list etiquette, though.
>>
>> I'm going to keep numbering my points because it makes it easy for you all
>> to respond.
>>
>> Point 1:
>> As I read it, KIP-253 is *just* about properly fencing the producers and
>> consumers so that you preserve the correct ordering of records during
>> partition expansion. This is clearly necessary regardless of anything else
>> we discuss. I think this whole discussion about backfill, consumers,
>> streams, etc., is beyond the scope of KIP-253. But it would be cumbersome
>> to start a new thread at this point.
>>
>> I had missed KIP-253's Proposed Change #9 among all the details... I think
>> this is a nice addition to the proposal. One thought is that it's actually
>> irrelevant whether the hash function is linear. This is simply an
>> algorithm
>> for moving a key from one partition to another, so the type of hash
>> function need not be a precondition. In fact, it also doesn't matter
>> whether the topic is compacted or not, the algorithm works regardless.
>>
>> I think this is a good algorithm to keep in mind, as it might solve a
>> variety of problems, but it does have a downside: that the producer won't
>> know whether or not K1 was actually in P1, it just knows that K1 was in
>> P1's keyspace before the new epoch. Therefore, it will have to
>> pessimistically send (K1,null) to P1 just in case. But the next time K1
>> comes along, the producer *also* won't remember that it already retracted
>> K1 from P1, so it will have to send (K1,null) *again*. By extension, every
>> time the producer sends to P2, it will also have to send a tombstone to
>> P1,
>> which is a pretty big burden. To make the situation worse, if there is a
>> second split, say P2 becomes P2 and P3, then any key Kx belonging to P3
>> will also have to be retracted from P2 *and* P1, since the producer can't
>> know whether Kx had been last written to P2 or P1. Over a long period of
>> time, this clearly becomes a issue, as the producer must send an arbitrary
>> number of retractions along with every update.
>>
>> In contrast, the proposed backfill operation has an end, and after it
>> ends,
>> everyone can afford to forget that there ever was a different partition
>> layout.
>>
>> Really, though, figuring out how to split compacted topics is beyond the
>> scope of KIP-253, so I'm not sure #9 really even needs to be in this
>> KIP...
>> We do need in-order delivery during partition expansion. It would be fine
>> by me to say that you *cannot* expand partitions of a log-compacted topic
>> and call it a day. I think it would be better to tackle that in another
>> KIP.
>>
>>
>> Point 2:
>> Regarding whether the consumer re-shuffles its inputs, this is always on
>> the table; any consumer who wants to re-shuffle its input is free to do
>> so.
>> But this is currently not required. It's just that the current high-level
>> story with Kafka encourages the use of partitions as a unit of
>> concurrency.
>> As long as consumers are single-threaded, they can happily consume a
>> single
>> partition without concurrency control of any kind. This is a key aspect to
>> this system that lets folks design high-throughput systems on top of it
>> surprisingly easily. If all consumers were instead encouraged/required to
>> implement a repartition of their own, then the consumer becomes
>> significantly more complex, requiring either the consumer to first produce
>> to its own intermediate repartition topic or to ensure that consumer
>> threads have a reliable, high-bandwith channel of communication with every
>> other consumer thread.
>>
>> Either of those tradeoffs may be reasonable for a particular user of
>> Kafka,
>> but I don't know if we're in a position to say that they are reasonable
>> for
>> *every* user of Kafka.
>>
>>
>> Point 3:
>> Regarding Jun's point about this use case, "(3) stateful and maintaining
>> the
>> states in a local store", I agree that they may use a framework *like*
>> Kafka Streams, but that is not the same as using Kafka Streams. This is
>> why
>> I think it's better to solve it in Core: because it is then solved for
>> KStreams and also for everything else that facilitates local state
>> maintenance. To me, Streams is a member of the category of "stream
>> processing frameworks", which is itself a subcategory of "things requiring
>> local state maintenence". I'm not sure if it makes sense to assert that
>> Streams is a sufficient and practical replacement for everything in
>> "things
>> requiring local state maintenence".
>>
>> But, yes, I do agree that per-key ordering is an absolute requirement,
>> therefore I think that KIP-253 itself is a necessary step. Regarding the
>> coupling of the state store partitioning to the topic partitioning, yes,
>> this is an issue we are discussing solutions to right now. We may go ahead
>> and introduce an overpartition layer on our inputs to solve it, but then
>> again, if we get the ability to split partitions with backfill, we may not
>> need to!
>>
>>
>> Point 4:
>> On this:
>>
>> > Regarding thought 2: If we don't care about the stream use-case, then
>> the
>> > current KIP probably has already addressed problem without requiring
>> > consumer to know the partition function. If we care about the stream
>> > use-case, we already need coordination across producers of different
>> > topics, i.e. the same partition function needs to be used by producers
>> of
>> > topics A and B in order to join topics A and B. Thus, it might be
>> > reasonable to extend coordination a bit and say we need coordination
>> across
>> > clients (i.e. producer and consumer), such that consumer knows the
>> > partition function used by producer. If we do so, then we can let
>> consumer
>> > re-copy data for the change log topic using the same partition function
>> as
>> > producer. This approach has lower overhead as compared to having
>> producer
>> > re-copy data of the input topic.
>> > Also, producer currently does not need to know the data already
>> produced to
>> > the topic. If we let producer split/merge partition, it would require
>> > producer to consume the existing data, which intuitively is the task of
>> > consumer.
>>
>>
>> I think we do care about use cases *like* Streams, I just don't think we
>> should rely on Streams to implement a feature of Core like partition
>> expansion.
>>
>> Note, though, that we (Streams) do not require coordination across
>> producers. If two topics are certified to be co-partitioned, then Streams
>> apps can make use of that knowledge to optimize their topology (skipping a
>> repartition). But if they don't know whether they are co-partitioned, then
>> they'd better go ahead and repartition within the topology. This is the
>> current state.
>>
>> A huge selling point of Kafka is enabling different parts of loosely
>> coupled organizations to produce and consume data independently. Some
>> coordination between producers and consumers is necessary, like
>> coordinating on the names of topics and their schemas. But Kafka's value
>> proposition w.r.t. ESBs, etc. is inversely proportional to the amount of
>> coordination required. I think it behooves us to be extremely skeptical
>> about introducing any coordination beyond correctness protocols.
>>
>> Asking producers and consumers, or even two different producers, to share
>> code like the partition function is a pretty huge ask. What if they are
>> using different languages?
>>
>> Comparing organizational overhead vs computational overhead, there are
>> maybe two orders of magnitude difference between them. In other words, I
>> would happily take on the (linear) overhead of having the producer re-copy
>> the data once during a re-partition in order to save the organizational
>> overhead of tying all the producers and consumers together across multiple
>> boundaries.
>>
>> On that last paragraph: note that the producer *did* know the data it
>> already produced. It handled it the first time around. Asking it to
>> re-produce it into a new partition layout is squarely within its scope of
>> capabilities. Contrast this with the alternative, asking the consumer to
>> re-partition the data. I think this is even less intuitive, when the
>> partition function belongs to the producer.
>>
>>
>> Point 5:
>> Dong asked this:
>>
>> > For stream use-case that needs to increase consumer number, the
>> > existing consumer can backfill the existing data in the change log
>> topic to
>> > the same change log topic with the new partition number, before the new
>> set
>> > of consumers bootstrap state from the new partitions of the change log
>> > topic, right?
>>
>>
>> In this sense, the "consumer" is actually the producer of the changelog
>> topic, so if we support partition expansion + backfill as a
>> producer/broker
>> operation, then it would be very straightforward for Streams to split a
>> state store. As you say, they would simply instruct the broker to split
>> the
>> changelog topic's partitions, then backfill. Once the backfill is ready,
>> they can create a new crop of StandbyTasks to bootstrap the more granular
>> state stores and finally switch over to them when they are ready.
>>
>> But this actually seems to be an argument in favor of split+backfill, so
>> maybe I missed the point.
>>
>> You also asked me to explain why copying the "input" topic is better than
>> copying the "changelog" topic. I think they are totally independent,
>> actually. For one thing, you can't depend on the existence of a
>> "changelog"
>> topic in general, only within Streams, but Kafka's user base clearly
>> exceeds Streams's user base. Plus, you actually also can't depend on the
>> existence of a changelog topic within Streams, since that is an optional
>> feature of *some* state store implementations. Even in the situation where
>> you do have a changelog topic in Streams, there may be use cases where it
>> makes sense to expand the partitions of just the input, or just the
>> changelog.
>>
>> The ask for a Core feature of split+backfill is really about supporting
>> the
>> use case of splitting partitions in log-compacted topics, regardless of
>> whether that topic is an "input" or a "changelog" or anything else for
>> that
>> matter.
>>
>>
>> Point 6:
>> On the concern about the performance overhead of copying data between the
>> brokers, I think it's actually a bit overestimated. Splitting a topic's
>> partition is probably rare, certainly rarer in general than bootstrapping
>> new consumers on that topic. If "bootstrapping new consumers" means that
>> they have to re-shuffle the data before they consume it, then you wind up
>> copying the same record multiple times:
>>
>> (broker: input topic) -> (initial consumer) -> (broker: repartition topic)
>> -> (real consumer)
>>
>> That's 3x, and it's also 3x for every new record after the split as well,
>> since you don't get to stop repartitioning/reshuffling once you start.
>>
>> Whereas if you do a backfill in something like the procedure I outlined,
>> you only copy the prefix of the partition before the split, and you send
>> it
>> once to the producer and then once to the new generation partition. Plus,
>> assuming we're splitting the partition for the benefit of consumers,
>> there's no reason we can't co-locate the post-split partitions on the same
>> host as the pre-split partition, making the second copy a local filesystem
>> operation.
>>
>> Even if you follow these two copies up with bootstrapping a new consumer,
>> it's still rare for this to occur, so you get to amortize these copies
>> over
>> the lifetime of the topic, whereas a reshuffle just keeps making copies
>> for
>> every new event.
>>
>> And finally, I really do think that regardless of any performance concerns
>> about this operation, if it preserves loose organizational coupling, it is
>> certainly worth it.
>>
>>
>> In conclusion:
>> It might actually be a good idea for us to clarify the scope of KIP-253.
>> If
>> we're all agreed that it's a good algorithm for allowing in-order message
>> delivery during partition expansion, then we can continue this discussion
>> as a new KIP, something like "backfill with partition expansion". This
>> would let Dong proceed with KIP-253. On the other hand, if it seems like
>> this conversation may alter the design of KIP-253, then maybe we *should*
>> just finish working it out.
>>
>> For my part, my only concern about KIP-253 is the one I raised earlier.
>>
>> Thanks again, all, for considering these points,
>> -John
>>
>>
>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Jan,
>> > >
>> > > Thanks for the enthusiasm in improving Kafka's design. Now that I have
>> > > read through your discussion with Jun, here are my thoughts:
>> > >
>> > > - The latest proposal should with log compacted topics by properly
>> > > deleting old messages after a new message with the same key is
>> produced.
>> > So
>> > > it is probably not a concern anymore. Could you comment if there is
>> still
>> > > issue?
>> > >
>> > > - I wrote the SEP-5 and I am pretty familiar with the motivation and
>> the
>> > > design of SEP-5. SEP-5 is probably orthornal to the motivation of this
>> > KIP.
>> > > The goal of SEP-5 is to allow user to increase task number of an
>> existing
>> > > Samza job. But if we increase the partition number of input topics,
>> > > messages may still be consumed out-of-order by tasks in Samza which
>> cause
>> > > incorrect result. Similarly, the approach you proposed does not seem
>> to
>> > > ensure that the messages can be delivered in order, even if we can
>> make
>> > > sure that each consumer instance is assigned the set of new partitions
>> > > covering the same set of keys.
>> > >
>> >
>> > Let me correct this comment. The approach of copying data to a new topic
>> > can ensure in-order message delivery suppose we properly migrate offsets
>> > from old topic to new topic.
>> >
>> >
>> > > - I am trying to understand why it is better to copy the data instead
>> of
>> > > copying the change log topic for streaming use-case. For core Kafka
>> > > use-case, and for the stream use-case that does not need to increase
>> > > consumers, the current KIP already supports in-order delivery without
>> the
>> > > overhead of copying the data. For stream use-case that needs to
>> increase
>> > > consumer number, the existing consumer can backfill the existing data
>> in
>> > > the change log topic to the same change log topic with the new
>> partition
>> > > number, before the new set of consumers bootstrap state from the new
>> > > partitions of the change log topic. If this solution works, then could
>> > you
>> > > summarize the advantage of copying the data of input topic as
>> compared to
>> > > copying the change log topic? For example, does it enable more
>> use-case,
>> > > simplify the implementation of Kafka library, or reduce the operation
>> > > overhead etc?
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > >
>> > > On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <
>> jan.filip...@trivago.com>
>> > > wrote:
>> > >
>> > >> Hi Jun,
>> > >>
>> > >> I was really seeing progress in our conversation but your latest
>> reply
>> > is
>> > >> just devastating.
>> > >> I though we were getting close being on the same page now it feels
>> like
>> > >> we are in different libraries.
>> > >>
>> > >> I just quickly slam my answers in here. If they are to brief I am
>> sorry
>> > >> give me a ping and try to go into details more.
>> > >> Just want to show that your pro/cons listing is broken.
>> > >>
>> > >> Best Jan
>> > >>
>> > >> and want to get rid of this horrible compromise
>> > >>
>> > >>
>> > >> On 19.03.2018 05:48, Jun Rao wrote:
>> > >>
>> > >>> Hi, Jan,
>> > >>>
>> > >>> Thanks for the discussion. Great points.
>> > >>>
>> > >>> Let me try to summarize the approach that you are proposing. On the
>> > >>> broker
>> > >>> side, we reshuffle the existing data in a topic from current
>> partitions
>> > >>> to
>> > >>> the new partitions. Once the reshuffle fully catches up, switch the
>> > >>> consumers to start consuming from the new partitions. If a consumer
>> > needs
>> > >>> to rebuild its local state (due to partition changes), let the
>> consumer
>> > >>> rebuild its state by reading all existing data from the new
>> partitions.
>> > >>> Once all consumers have switches over, cut over the producer to the
>> new
>> > >>> partitions.
>> > >>>
>> > >>> The pros for this approach are that :
>> > >>> 1. There is just one way to rebuild the local state, which is
>> simpler.
>> > >>>
>> > >> true thanks
>> > >>
>> > >>>
>> > >>> The cons for this approach are:
>> > >>> 1. Need to copy existing data.
>> > >>>
>> > >> Very unfair and not correct. It does not require you to copy over
>> > >> existing data. It _allows_ you to copy all existing data.
>> > >>
>> > >> 2. The cutover of the producer is a bit complicated since it needs to
>> > >>> coordinate with all consumer groups.
>> > >>>
>> > >> Also not true. I explicitly tried to make clear that there is only
>> one
>> > >> special consumer (in the case of actually copying data) coordination
>> is
>> > >> required.
>> > >>
>> > >>> 3. The rebuilding of the state in the consumer is from the input
>> topic,
>> > >>> which can be more expensive than rebuilding from the existing state.
>> > >>>
>> > >> true, but rebuilding state is only required if you want to increase
>> > >> processing power, so we assume this is at hand.
>> > >>
>> > >>> 4. The broker potentially has to know the partitioning function. If
>> > this
>> > >>> needs to be customized at the topic level, it can be a bit messy.
>> > >>>
>> > >> I would argue against having the operation being performed by the
>> > broker.
>> > >> This was not discussed yet but if you see my original email i
>> suggested
>> > >> otherwise from the beginning.
>> > >>
>> > >>>
>> > >>> Here is an alternative approach by applying your idea not in the
>> > broker,
>> > >>> but in the consumer. When new partitions are added, we don't move
>> > >>> existing
>> > >>> data. In KStreams, we first reshuffle the new input data to a new
>> topic
>> > >>> T1
>> > >>> with the old number of partitions and feed T1's data to the rest of
>> the
>> > >>> pipeline. In the meantime, KStreams reshuffles all existing data of
>> the
>> > >>> change capture topic to another topic C1 with the new number of
>> > >>> partitions.
>> > >>> We can then build the state of the new tasks from C1. Once the new
>> > states
>> > >>> have been fully built, we can cut over the consumption to the input
>> > topic
>> > >>> and delete T1. This approach works with compacted topic too. If an
>> > >>> application reads from the beginning of a compacted topic, the
>> consumer
>> > >>> will reshuffle the portion of the input when the number of
>> partitions
>> > >>> doesn't match the number of tasks.
>> > >>>
>> > >> We all wipe this idea from our heads instantly. Mixing Ideas from an
>> > >> argument is not a resolution strategy
>> > >> just leads to horrible horrible software.
>> > >>
>> > >>
>> > >>> The pros of this approach are:
>> > >>> 1. No need to copy existing data.
>> > >>> 2. Each consumer group can cut over to the new partitions
>> > independently.
>> > >>> 3. The state is rebuilt from the change capture topic, which is
>> cheaper
>> > >>> than rebuilding from the input topic.
>> > >>> 4. Only the KStreams job needs to know the partitioning function.
>> > >>>
>> > >>> The cons of this approach are:
>> > >>> 1. Potentially the same input topic needs to be reshuffled more than
>> > once
>> > >>> in different consumer groups during the transition phase.
>> > >>>
>> > >>> What do you think?
>> > >>>
>> > >>> Thanks,
>> > >>>
>> > >>> Jun
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
>> > jan.filip...@trivago.com>
>> > >>> wrote:
>> > >>>
>> > >>> Hi Jun,
>> > >>>>
>> > >>>> thank you for following me on these thoughts. It was important to
>> me
>> > to
>> > >>>> feel that kind of understanding for my arguments.
>> > >>>>
>> > >>>> What I was hoping for (I mentioned this earlier) is that we can
>> model
>> > >>>> the
>> > >>>> case where we do not want to copy the data the exact same way as
>> the
>> > >>>> case
>> > >>>> when we do copy the data. Maybe you can peek into the mails before
>> to
>> > >>>> see
>> > >>>> more details for this.
>> > >>>>
>> > >>>> This means we have the same mechanism to transfer consumer groups
>> to
>> > >>>> switch topic. The offset mapping that would be generated would
>> even be
>> > >>>> simpler End Offset of the Old topic => offset 0 off all the
>> partitions
>> > >>>> of
>> > >>>> the new topic. Then we could model the transition of a non-copy
>> > >>>> expansion
>> > >>>> the exact same way as a copy-expansion.
>> > >>>>
>> > >>>> I know this only works when topic growth by a factor. But the
>> benefits
>> > >>>> of
>> > >>>> only growing by a factor are to strong anyways. See Clemens's hint
>> and
>> > >>>> remember that state reshuffling is entirely not needed if one
>> doesn't
>> > >>>> want
>> > >>>> to grow processing power.
>> > >>>>
>> > >>>> I think these benefits should be clear, and that there is
>> basically no
>> > >>>> downside to what is currently at hand but just makes everything
>> easy.
>> > >>>>
>> > >>>> One thing you need to know is. that if you do not offer rebuilding
>> a
>> > log
>> > >>>> compacted topic like i suggest that even if you have consumer state
>> > >>>> reshuffling. The topic is broken and can not be used to bootstrap
>> new
>> > >>>> consumers. They don't know if they need to apply a key from and old
>> > >>>> partition or not. This is a horrible downside I haven't seen a
>> > solution
>> > >>>> for
>> > >>>> in the email conversation.
>> > >>>>
>> > >>>> I argue to:
>> > >>>>
>> > >>>> Only grow topic by a factor always.
>> > >>>> Have the "no copy consumer" transition as the trivial case of the
>> > "copy
>> > >>>> consumer transition".
>> > >>>> If processors needs to be scaled, let them rebuild from the new
>> topic
>> > >>>> and
>> > >>>> leave the old running in the mean time.
>> > >>>> Do not implement key shuffling in streams.
>> > >>>>
>> > >>>> I hope I can convince you especially with the fact how I want to
>> > handle
>> > >>>> consumer transition. I think
>> > >>>> you didn't quite understood me there before. I think the term "new
>> > >>>> topic"
>> > >>>> intimidated you a little.
>> > >>>> How we solve this on disc doesn't really matter, If the data goes
>> into
>> > >>>> the
>> > >>>> same Dir or a different Dir or anything. I do think that it needs
>> to
>> > >>>> involve at least rolling a new segment for the existing partitions.
>> > >>>> But most of the transitions should work without restarting
>> consumers.
>> > >>>> (newer consumers with support for this). But with new topic i just
>> > meant
>> > >>>> the topic that now has a different partition count. Plenty of ways
>> to
>> > >>>> handle that (versions, aliases)
>> > >>>>
>> > >>>> Hope I can further get my idea across.
>> > >>>>
>> > >>>> Best Jan
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On 14.03.2018 02:45, Jun Rao wrote:
>> > >>>>
>> > >>>> Hi, Jan,
>> > >>>>>
>> > >>>>> Thanks for sharing your view.
>> > >>>>>
>> > >>>>> I agree with you that recopying the data potentially makes the
>> state
>> > >>>>> management easier since the consumer can just rebuild its state
>> from
>> > >>>>> scratch (i.e., no need for state reshuffling).
>> > >>>>>
>> > >>>>> On the flip slide, I saw a few disadvantages of the approach that
>> you
>> > >>>>> suggested. (1) Building the state from the input topic from
>> scratch
>> > is
>> > >>>>> in
>> > >>>>> general less efficient than state reshuffling. Let's say one
>> > computes a
>> > >>>>> count per key from an input topic. The former requires reading all
>> > >>>>> existing
>> > >>>>> records in the input topic whereas the latter only requires
>> reading
>> > >>>>> data
>> > >>>>> proportional to the number of unique keys. (2) The switching of
>> the
>> > >>>>> topic
>> > >>>>> needs modification to the application. If there are many
>> applications
>> > >>>>> on a
>> > >>>>> topic, coordinating such an effort may not be easy. Also, it's not
>> > >>>>> clear
>> > >>>>> how to enforce exactly-once semantic during the switch. (3) If a
>> > topic
>> > >>>>> doesn't need any state management, recopying the data seems
>> wasteful.
>> > >>>>> In
>> > >>>>> that case, in place partition expansion seems more desirable.
>> > >>>>>
>> > >>>>> I understand your concern about adding complexity in KStreams.
>> But,
>> > >>>>> perhaps
>> > >>>>> we could iterate on that a bit more to see if it can be
>> simplified.
>> > >>>>>
>> > >>>>> Jun
>> > >>>>>
>> > >>>>>
>> > >>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
>> > >>>>> jan.filip...@trivago.com>
>> > >>>>> wrote:
>> > >>>>>
>> > >>>>> Hi Jun,
>> > >>>>>
>> > >>>>>> I will focus on point 61 as I think its _the_ fundamental part
>> that
>> > I
>> > >>>>>> cant
>> > >>>>>> get across at the moment.
>> > >>>>>>
>> > >>>>>> Kafka is the platform to have state materialized multiple times
>> from
>> > >>>>>> one
>> > >>>>>> input. I emphasize this: It is the building block in
>> architectures
>> > >>>>>> that
>> > >>>>>> allow you to
>> > >>>>>> have your state maintained multiple times. You put a message in
>> > once,
>> > >>>>>> and
>> > >>>>>> you have it pop out as often as you like. I believe you
>> understand
>> > >>>>>> this.
>> > >>>>>>
>> > >>>>>> Now! The path of thinking goes the following: I am using apache
>> > kafka
>> > >>>>>> and
>> > >>>>>> I _want_ my state multiple times. What am I going todo?
>> > >>>>>>
>> > >>>>>> A) Am I going to take my state that I build up, plunge some sort
>> of
>> > >>>>>> RPC
>> > >>>>>> layer ontop of it, use that RPC layer to throw my records across
>> > >>>>>> instances?
>> > >>>>>> B) Am I just going to read the damn message twice?
>> > >>>>>>
>> > >>>>>> Approach A is fundamentally flawed and a violation of all that is
>> > good
>> > >>>>>> and
>> > >>>>>> holy in kafka deployments. I can not understand how this Idea can
>> > >>>>>> come in
>> > >>>>>> the first place.
>> > >>>>>> (I do understand: IQ in streams, they polluted the kafka streams
>> > >>>>>> codebase
>> > >>>>>> really bad already. It is not funny! I think they are equally
>> flawed
>> > >>>>>> as
>> > >>>>>> A)
>> > >>>>>>
>> > >>>>>> I say, we do what Kafka is good at. We repartition the topic
>> once.
>> > We
>> > >>>>>> switch the consumers.
>> > >>>>>> (Those that need more partitions are going to rebuild their
>> state in
>> > >>>>>> multiple partitions by reading the new topic, those that don't
>> just
>> > >>>>>> assign
>> > >>>>>> the new partitions properly)
>> > >>>>>> We switch producers. Done!
>> > >>>>>>
>> > >>>>>> The best thing! It is trivial, hipster stream processor will
>> have an
>> > >>>>>> easy
>> > >>>>>> time with that aswell. Its so super simple. And simple IS good!
>> > >>>>>> It is what kafka was build todo. It is how we do it today. All I
>> am
>> > >>>>>> saying
>> > >>>>>> is that a little broker help doing the producer swap is super
>> > useful.
>> > >>>>>>
>> > >>>>>> For everyone interested in why kafka is so powerful with
>> approach B,
>> > >>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633
>> > >>>>>> I already looked up a good point in time, I think after 5 minutes
>> > the
>> > >>>>>> "state" topic is handled and you should be able to understand me
>> > >>>>>> and inch better.
>> > >>>>>>
>> > >>>>>> Please do not do A to the project, it deserves better!
>> > >>>>>>
>> > >>>>>> Best Jan
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> On 13.03.2018 02:40, Jun Rao wrote:
>> > >>>>>>
>> > >>>>>> Hi, Jan,
>> > >>>>>>
>> > >>>>>>> Thanks for the reply. A few more comments below.
>> > >>>>>>>
>> > >>>>>>> 50. Ok, we can think a bit harder for supporting compacted
>> topics.
>> > >>>>>>>
>> > >>>>>>> 51. This is a fundamental design question. In the more common
>> case,
>> > >>>>>>> the
>> > >>>>>>> reason why someone wants to increase the number of partitions is
>> > that
>> > >>>>>>> the
>> > >>>>>>> consumer application is slow and one wants to run more consumer
>> > >>>>>>> instances
>> > >>>>>>> to increase the degree of parallelism. So, fixing the number of
>> > >>>>>>> running
>> > >>>>>>> consumer instances when expanding the partitions won't help this
>> > >>>>>>> case.
>> > >>>>>>> If
>> > >>>>>>> we do need to increase the number of consumer instances, we
>> need to
>> > >>>>>>> somehow
>> > >>>>>>> reshuffle the state of the consumer across instances. What we
>> have
>> > >>>>>>> been
>> > >>>>>>> discussing in this KIP is whether we can do this more
>> effectively
>> > >>>>>>> through
>> > >>>>>>> the KStream library (e.g. through a 2-phase partition
>> expansion).
>> > >>>>>>> This
>> > >>>>>>> will
>> > >>>>>>> add some complexity, but it's probably better than everyone
>> doing
>> > >>>>>>> this
>> > >>>>>>> in
>> > >>>>>>> the application space. The recopying approach that you mentioned
>> > >>>>>>> doesn't
>> > >>>>>>> seem to address the consumer state management issue when the
>> > consumer
>> > >>>>>>> switches from an old to a new topic.
>> > >>>>>>>
>> > >>>>>>> 52. As for your example, it depends on whether the join key is
>> the
>> > >>>>>>> same
>> > >>>>>>> between (A,B) and (B,C). If the join key is the same, we can do
>> a
>> > >>>>>>> 2-phase
>> > >>>>>>> partition expansion of A, B, and C together. If the join keys
>> are
>> > >>>>>>> different, one would need to repartition the data on a different
>> > key
>> > >>>>>>> for
>> > >>>>>>> the second join, then the partition expansion can be done
>> > >>>>>>> independently
>> > >>>>>>> between (A,B) and (B,C).
>> > >>>>>>>
>> > >>>>>>> 53. If you always fix the number of consumer instances, we you
>> > >>>>>>> described
>> > >>>>>>> works. However, as I mentioned in #51, I am not sure how your
>> > >>>>>>> proposal
>> > >>>>>>> deals with consumer states when the number of consumer instances
>> > >>>>>>> grows.
>> > >>>>>>> Also, it just seems that it's better to avoid re-copying the
>> > existing
>> > >>>>>>> data.
>> > >>>>>>>
>> > >>>>>>> 60. "just want to throw in my question from the longer email in
>> the
>> > >>>>>>> other
>> > >>>>>>> Thread here. How will the bloom filter help a new consumer to
>> > decide
>> > >>>>>>> to
>> > >>>>>>> apply the key or not?" Not sure that I fully understood your
>> > >>>>>>> question.
>> > >>>>>>> The
>> > >>>>>>> consumer just reads whatever key is in the log. The bloom filter
>> > just
>> > >>>>>>> helps
>> > >>>>>>> clean up the old keys.
>> > >>>>>>>
>> > >>>>>>> 61. "Why can we afford having a topic where its apparently not
>> > >>>>>>> possible
>> > >>>>>>> to
>> > >>>>>>> start a new application on? I think this is an overall flaw of
>> the
>> > >>>>>>> discussed idea here. Not playing attention to the overall
>> > >>>>>>> architecture."
>> > >>>>>>> Could you explain a bit more when one can't start a new
>> > application?
>> > >>>>>>>
>> > >>>>>>> Jun
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
>> > >>>>>>> jan.filip...@trivago.com
>> > >>>>>>> wrote:
>> > >>>>>>>
>> > >>>>>>> Hi Jun, thanks for your mail.
>> > >>>>>>>
>> > >>>>>>> Thank you for your questions!
>> > >>>>>>>> I think they are really good and tackle the core of the
>> problem I
>> > >>>>>>>> see.
>> > >>>>>>>>
>> > >>>>>>>> I will answer inline, mostly but still want to set the tone
>> here.
>> > >>>>>>>>
>> > >>>>>>>> The core strength of kafka is what Martin once called the
>> > >>>>>>>> kappa-Architecture. How does this work?
>> > >>>>>>>> You have everything as a log as in kafka. When you need to
>> change
>> > >>>>>>>> something.
>> > >>>>>>>> You create the new version of your application and leave it
>> > running
>> > >>>>>>>> in
>> > >>>>>>>> parallel.
>> > >>>>>>>> Once the new version is good you switch your users to use the
>> new
>> > >>>>>>>> application.
>> > >>>>>>>>
>> > >>>>>>>> The online reshuffling effectively breaks this architecture
>> and I
>> > >>>>>>>> think
>> > >>>>>>>> the switch in thinking here is more harmful
>> > >>>>>>>> than any details about the partitioning function to allow such
>> a
>> > >>>>>>>> change.
>> > >>>>>>>> I
>> > >>>>>>>> feel with my suggestion we are the closest to
>> > >>>>>>>> the original and battle proven architecture and I can only
>> warn to
>> > >>>>>>>> move
>> > >>>>>>>> away from it.
>> > >>>>>>>>
>> > >>>>>>>> I might have forgotten something, sometimes its hard for me to
>> > >>>>>>>> getting
>> > >>>>>>>> all
>> > >>>>>>>> the thoughts captured in a mail, but I hope the comments inline
>> > will
>> > >>>>>>>> further make my concern clear, and put some emphasis on why I
>> > >>>>>>>> prefer my
>> > >>>>>>>> solution ;)
>> > >>>>>>>>
>> > >>>>>>>> One thing we should all be aware of when discussing this, and I
>> > >>>>>>>> think
>> > >>>>>>>> Dong
>> > >>>>>>>> should have mentioned it (maybe he did).
>> > >>>>>>>> We are not discussing all of this out of thin air but there is
>> an
>> > >>>>>>>> effort
>> > >>>>>>>> in the Samza project.
>> > >>>>>>>>
>> > >>>>>>>> https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+
>> > >>>>>>>> Enable+partition+expansion+of+input+streams
>> > >>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293
>> > >>>>>>>>
>> > >>>>>>>> To be clear. I think SEP-5 (state of last week, dont know if it
>> > >>>>>>>> adapted
>> > >>>>>>>> to
>> > >>>>>>>> this discussion) is on a way better path than KIP-253, and I
>> can't
>> > >>>>>>>> really
>> > >>>>>>>> explain why.
>> > >>>>>>>>
>> > >>>>>>>> Best Jan,
>> > >>>>>>>>
>> > >>>>>>>> nice weekend everyone
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> On 09.03.2018 03:36, Jun Rao wrote:
>> > >>>>>>>>
>> > >>>>>>>> Hi, Jan,
>> > >>>>>>>>
>> > >>>>>>>> Thanks for the feedback. Just some comments on the earlier
>> points
>> > >>>>>>>>> that
>> > >>>>>>>>> you
>> > >>>>>>>>> mentioned.
>> > >>>>>>>>>
>> > >>>>>>>>> 50. You brought up the question of whether existing data
>> needs to
>> > >>>>>>>>> be
>> > >>>>>>>>> copied
>> > >>>>>>>>> during partition expansion. My understand of your view is that
>> > >>>>>>>>> avoid
>> > >>>>>>>>> copying existing data will be more efficient, but it doesn't
>> work
>> > >>>>>>>>> well
>> > >>>>>>>>> with
>> > >>>>>>>>> compacted topics since some keys in the original partitions
>> will
>> > >>>>>>>>> never
>> > >>>>>>>>> be
>> > >>>>>>>>> cleaned. It would be useful to understand your use case of
>> > >>>>>>>>> compacted
>> > >>>>>>>>> topics
>> > >>>>>>>>> a bit more. In the common use case, the data volume in a
>> > compacted
>> > >>>>>>>>> topic
>> > >>>>>>>>> may not be large. So, I am not sure if there is a strong need
>> to
>> > >>>>>>>>> expand
>> > >>>>>>>>> partitions in a compacted topic, at least initially.
>> > >>>>>>>>>
>> > >>>>>>>>> I do agree. State is usually smaller. Update rates might be
>> also
>> > >>>>>>>>>
>> > >>>>>>>>> competitively high.
>> > >>>>>>>> Doing Log-compaction (even beeing very efficient and
>> configurable)
>> > >>>>>>>> is
>> > >>>>>>>> also
>> > >>>>>>>> a more expensive operation than
>> > >>>>>>>> just discarding old segments. Further if you want to use more
>> > >>>>>>>> consumers
>> > >>>>>>>> processing the events
>> > >>>>>>>> you also have to grow the number of partitions. Especially for
>> > >>>>>>>> use-cases
>> > >>>>>>>> we do (KIP-213) a tiny state full
>> > >>>>>>>> table might be very expensive to process if it joins against a
>> > huge
>> > >>>>>>>> table.
>> > >>>>>>>>
>> > >>>>>>>> I can just say we have been in the spot of needing to grow log
>> > >>>>>>>> compacted
>> > >>>>>>>> topics. Mainly for processing power we can bring to the table.
>> > >>>>>>>>
>> > >>>>>>>> Further i am not at all concerned about the extra spaced used
>> by
>> > >>>>>>>> "garbage
>> > >>>>>>>> keys". I am more concerned about the correctness of innocent
>> > >>>>>>>> consumers.
>> > >>>>>>>> The
>> > >>>>>>>> logic becomes complicated. Say for streams one would need to
>> load
>> > >>>>>>>> the
>> > >>>>>>>> record into state but not forward it the topology ( to have it
>> > >>>>>>>> available
>> > >>>>>>>> for shuffeling). I rather have it simple and a topic clean
>> > >>>>>>>> regardless
>> > >>>>>>>> if
>> > >>>>>>>> it
>> > >>>>>>>> still has its old partition count. Especially with multiple
>> > >>>>>>>> partitions
>> > >>>>>>>> growth's I think it becomes insanely hard to to this shuffle
>> > >>>>>>>> correct.
>> > >>>>>>>> Maybe
>> > >>>>>>>> Streams and Samza can do it. Especially if you do "hipster
>> stream
>> > >>>>>>>> processing" <https://www.confluent.io/blog
>> > >>>>>>>> /introducing-kafka-streams-
>> > >>>>>>>> stream-processing-made-simple/>. This makes kafka way to
>> > >>>>>>>> complicated.
>> > >>>>>>>> With my approach I think its way simpler because the topic has
>> no
>> > >>>>>>>> "history"
>> > >>>>>>>> in terms of partitioning but is always clean.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 51. "Growing the topic by an integer factor does not require
>> any
>> > >>>>>>>> state
>> > >>>>>>>>
>> > >>>>>>>> redistribution at all." Could you clarify this a bit more?
>> Let's
>> > say
>> > >>>>>>>>> you
>> > >>>>>>>>> have a consumer app that computes the windowed count per key.
>> If
>> > >>>>>>>>> you
>> > >>>>>>>>> double
>> > >>>>>>>>> the number of partitions from 1 to 2 and grow the consumer
>> > >>>>>>>>> instances
>> > >>>>>>>>> from
>> > >>>>>>>>> 1
>> > >>>>>>>>> to 2, we would need to redistribute some of the counts to the
>> new
>> > >>>>>>>>> consumer
>> > >>>>>>>>> instance. Regarding to linear hashing, it's true that it won't
>> > >>>>>>>>> solve
>> > >>>>>>>>> the
>> > >>>>>>>>> problem with compacted topics. The main benefit is that it
>> > >>>>>>>>> redistributes
>> > >>>>>>>>> the keys in one partition to no more than two partitions,
>> which
>> > >>>>>>>>> could
>> > >>>>>>>>> help
>> > >>>>>>>>> redistribute the state.
>> > >>>>>>>>>
>> > >>>>>>>>> You don't need to spin up a new consumer in this case. every
>> > >>>>>>>>> consumer
>> > >>>>>>>>>
>> > >>>>>>>>> would just read every partition with the (partition %
>> num_task)
>> > >>>>>>>> task.
>> > >>>>>>>> it
>> > >>>>>>>> will still have the previous data right there and can go on.
>> > >>>>>>>>
>> > >>>>>>>> This sounds contradictory to what I said before, but please
>> bear
>> > >>>>>>>> with
>> > >>>>>>>> me.
>> > >>>>>>>>
>> > >>>>>>>> 52. Good point on coordinating the expansion of 2 topics that
>> need
>> > >>>>>>>> to
>> > >>>>>>>> be
>> > >>>>>>>>
>> > >>>>>>>> joined together. This is where the 2-phase partition expansion
>> > could
>> > >>>>>>>>> potentially help. In the first phase, we could add new
>> partitions
>> > >>>>>>>>> to
>> > >>>>>>>>> the 2
>> > >>>>>>>>> topics one at a time but without publishing to the new
>> patitions.
>> > >>>>>>>>> Then,
>> > >>>>>>>>> we
>> > >>>>>>>>> can add new consumer instances to pick up the new partitions.
>> In
>> > >>>>>>>>> this
>> > >>>>>>>>> transition phase, no reshuffling is needed since no data is
>> > coming
>> > >>>>>>>>> from
>> > >>>>>>>>> the
>> > >>>>>>>>> new partitions. Finally, we can enable the publishing to the
>> new
>> > >>>>>>>>> partitions.
>> > >>>>>>>>>
>> > >>>>>>>>> I think its even worse than you think. I would like to
>> introduce
>> > >>>>>>>>> the
>> > >>>>>>>>>
>> > >>>>>>>>> Term
>> > >>>>>>>> transitive copartitioning. Imagine
>> > >>>>>>>> 2 streams application. One joins (A,B) the other (B,C) then
>> there
>> > >>>>>>>> is a
>> > >>>>>>>> transitive copartition requirement for
>> > >>>>>>>> (A,C) to be copartitioned aswell. This can spread significantly
>> > and
>> > >>>>>>>> require many consumers to adapt at the same time.
>> > >>>>>>>>
>> > >>>>>>>> It is also not entirely clear to me how you not need
>> reshuffling
>> > in
>> > >>>>>>>> this
>> > >>>>>>>> case. If A has a record that never gets updated after the
>> > expansion
>> > >>>>>>>> and
>> > >>>>>>>> the
>> > >>>>>>>> coresponding B record moves to a new partition. How shall they
>> > meet
>> > >>>>>>>> w/o
>> > >>>>>>>> shuffle?
>> > >>>>>>>>
>> > >>>>>>>> 53. "Migrating consumer is a step that might be made completly
>> > >>>>>>>>
>> > >>>>>>>> unnecessary
>> > >>>>>>>>> if - for example streams - takes the gcd as partitioning
>> scheme
>> > >>>>>>>>> instead
>> > >>>>>>>>> of
>> > >>>>>>>>> enforcing 1 to 1." Not sure that I fully understand this. I
>> think
>> > >>>>>>>>> you
>> > >>>>>>>>> mean
>> > >>>>>>>>> that a consumer application can run more instances than the
>> > number
>> > >>>>>>>>> of
>> > >>>>>>>>> partitions. In that case, the consumer can just repartitioning
>> > the
>> > >>>>>>>>> input
>> > >>>>>>>>> data according to the number of instances. This is possible,
>> but
>> > >>>>>>>>> just
>> > >>>>>>>>> has
>> > >>>>>>>>> the overhead of reshuffling the data.
>> > >>>>>>>>>
>> > >>>>>>>>> No what I meant is ( that is also your question i think
>> Mathias)
>> > >>>>>>>>> that
>> > >>>>>>>>> if
>> > >>>>>>>>>
>> > >>>>>>>>> you grow a topic by a factor.
>> > >>>>>>>> Even if your processor is statefull you can can just assign all
>> > the
>> > >>>>>>>> multiples of the previous partition to
>> > >>>>>>>> this consumer and the state to keep processing correctly will
>> be
>> > >>>>>>>> present
>> > >>>>>>>> w/o any shuffling.
>> > >>>>>>>>
>> > >>>>>>>> Say you have an assignment
>> > >>>>>>>> Statefull consumer => partition
>> > >>>>>>>> 0 => 0
>> > >>>>>>>> 1 => 1
>> > >>>>>>>> 2 => 2
>> > >>>>>>>>
>> > >>>>>>>> and you grow you topic by 4 you get,
>> > >>>>>>>>
>> > >>>>>>>> 0 => 0,3,6,9
>> > >>>>>>>> 1 => 1,4,7,10
>> > >>>>>>>> 2 => 2,5,8,11
>> > >>>>>>>>
>> > >>>>>>>> Say your hashcode is 8. 8%3 => 2  before so consumer for
>> > partition 2
>> > >>>>>>>> has
>> > >>>>>>>> it.
>> > >>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it goes into
>> > >>>>>>>> partition
>> > >>>>>>>> 8
>> > >>>>>>>> which is assigned to the same consumer
>> > >>>>>>>> who had 2 before and therefore knows the key.
>> > >>>>>>>>
>> > >>>>>>>> Userland reshuffeling is there as an options. And it does
>> exactly
>> > >>>>>>>> what
>> > >>>>>>>> I
>> > >>>>>>>> suggest. And I think its the perfect strategie. All I am
>> > suggestion
>> > >>>>>>>> is
>> > >>>>>>>> broker side support to switch the producers to the newly
>> > partitioned
>> > >>>>>>>> topic.
>> > >>>>>>>> Then the old (to few partition topic) can go away.  Remember
>> the
>> > >>>>>>>> list
>> > >>>>>>>> of
>> > >>>>>>>> steps in the beginning of this thread. If one has broker
>> support
>> > for
>> > >>>>>>>> all
>> > >>>>>>>> where its required and streams support for those that aren’t
>> > >>>>>>>> necessarily.
>> > >>>>>>>> Then one has solved the problem.
>> > >>>>>>>> I repeat it because I think its important. I am really happy
>> that
>> > >>>>>>>> you
>> > >>>>>>>> brought that up! because its 100% what I want just with the
>> > >>>>>>>> differences
>> > >>>>>>>> to
>> > >>>>>>>> have an option to discard the to small topic later (after all
>> > >>>>>>>> consumers
>> > >>>>>>>> adapted). And to have order correct there. I need broker
>> support
>> > >>>>>>>> managing
>> > >>>>>>>> the copy process + the produces and fence them against each
>> > other. I
>> > >>>>>>>> also
>> > >>>>>>>> repeat. the copy process can run for weeks in the worst case.
>> > >>>>>>>> Copying
>> > >>>>>>>> the
>> > >>>>>>>> data is not the longest task migrating consumers might very
>> well
>> > be.
>> > >>>>>>>> Once all consumers switched and copying is really up to date
>> > (think
>> > >>>>>>>> ISR
>> > >>>>>>>> like up to date) only then we stop the producer, wait for the
>> copy
>> > >>>>>>>> to
>> > >>>>>>>> finish and use the new topic for producing.
>> > >>>>>>>>
>> > >>>>>>>> After this the topic is perfect in shape. and no one needs to
>> > worry
>> > >>>>>>>> about
>> > >>>>>>>> complicated stuff. (old keys hanging around might arrive in
>> some
>> > >>>>>>>> other
>> > >>>>>>>> topic later.....). can only imagine how many tricky bugs gonna
>> > >>>>>>>> arrive
>> > >>>>>>>> after
>> > >>>>>>>> someone had grown and shrunken is topic 10 times.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> 54. "The other thing I wanted to mention is that I believe the
>> > >>>>>>>> current
>> > >>>>>>>>
>> > >>>>>>>> suggestion (without copying data over) can be implemented in
>> pure
>> > >>>>>>>>> userland
>> > >>>>>>>>> with a custom partitioner and a small feedbackloop from
>> > >>>>>>>>> ProduceResponse
>> > >>>>>>>>> =>
>> > >>>>>>>>> Partitionier in coorporation with a change management
>> system." I
>> > am
>> > >>>>>>>>> not
>> > >>>>>>>>> sure a customized partitioner itself solves the problem. We
>> > >>>>>>>>> probably
>> > >>>>>>>>> need
>> > >>>>>>>>> some broker side support to enforce when the new partitions
>> can
>> > be
>> > >>>>>>>>> used.
>> > >>>>>>>>> We
>> > >>>>>>>>> also need some support on the consumer/kstream side to
>> preserve
>> > the
>> > >>>>>>>>> per
>> > >>>>>>>>> key
>> > >>>>>>>>> ordering and potentially migrate the processing state. This is
>> > not
>> > >>>>>>>>> trivial
>> > >>>>>>>>> and I am not sure if it's ideal to fully push to the
>> application
>> > >>>>>>>>> space.
>> > >>>>>>>>>
>> > >>>>>>>>> Broker support is defenitly the preferred way here. I have
>> > nothing
>> > >>>>>>>>>
>> > >>>>>>>>> against
>> > >>>>>>>> broker support.
>> > >>>>>>>> I tried to say that for what I would preffer - copying the data
>> > >>>>>>>> over,
>> > >>>>>>>> at
>> > >>>>>>>> least for log compacted topics -
>> > >>>>>>>> I would require more broker support than the KIP currently
>> offers.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> Jun
>> > >>>>>>>>
>> > >>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
>> > >>>>>>>>> jan.filip...@trivago.com
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>> Hi Dong,
>> > >>>>>>>>>
>> > >>>>>>>>> are you actually reading my emails, or are you just using the
>> > >>>>>>>>> thread I
>> > >>>>>>>>>
>> > >>>>>>>>>> started for general announcements regarding the KIP?
>> > >>>>>>>>>>
>> > >>>>>>>>>> I tried to argue really hard against linear hashing. Growing
>> the
>> > >>>>>>>>>> topic
>> > >>>>>>>>>> by
>> > >>>>>>>>>> an integer factor does not require any state redistribution
>> at
>> > >>>>>>>>>> all. I
>> > >>>>>>>>>> fail
>> > >>>>>>>>>> to see completely where linear hashing helps on log compacted
>> > >>>>>>>>>> topics.
>> > >>>>>>>>>>
>> > >>>>>>>>>> If you are not willing to explain to me what I might be
>> > >>>>>>>>>> overlooking:
>> > >>>>>>>>>> that
>> > >>>>>>>>>> is fine.
>> > >>>>>>>>>> But I ask you to not reply to my emails then. Please
>> understand
>> > my
>> > >>>>>>>>>> frustration with this.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Best Jan
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>> Hi everyone,
>> > >>>>>>>>>>
>> > >>>>>>>>>> Thanks for all the comments! It appears that everyone prefers
>> > >>>>>>>>>> linear
>> > >>>>>>>>>>
>> > >>>>>>>>>>> hashing because it reduces the amount of state that needs
>> to be
>> > >>>>>>>>>>> moved
>> > >>>>>>>>>>> between consumers (for stream processing). The KIP has been
>> > >>>>>>>>>>> updated
>> > >>>>>>>>>>> to
>> > >>>>>>>>>>> use
>> > >>>>>>>>>>> linear hashing.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Regarding the migration endeavor: it seems that migrating
>> > >>>>>>>>>>> producer
>> > >>>>>>>>>>> library
>> > >>>>>>>>>>> to use linear hashing should be pretty straightforward
>> without
>> > >>>>>>>>>>> much operational endeavor. If we don't upgrade client
>> library
>> > to
>> > >>>>>>>>>>> use
>> > >>>>>>>>>>> this
>> > >>>>>>>>>>> KIP, we can not support in-order delivery after partition is
>> > >>>>>>>>>>> changed
>> > >>>>>>>>>>> anyway. Suppose we upgrade client library to use this KIP,
>> if
>> > >>>>>>>>>>> partition
>> > >>>>>>>>>>> number is not changed, the key -> partition mapping will be
>> > >>>>>>>>>>> exactly
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>> same as it is now because it is still determined using
>> > >>>>>>>>>>> murmur_hash(key)
>> > >>>>>>>>>>> %
>> > >>>>>>>>>>> original_partition_num. In other words, this change is
>> backward
>> > >>>>>>>>>>> compatible.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Regarding the load distribution: if we use linear hashing,
>> the
>> > >>>>>>>>>>> load
>> > >>>>>>>>>>> may
>> > >>>>>>>>>>> be
>> > >>>>>>>>>>> unevenly distributed because those partitions which are not
>> > split
>> > >>>>>>>>>>> may
>> > >>>>>>>>>>> receive twice as much traffic as other partitions that are
>> > split.
>> > >>>>>>>>>>> This
>> > >>>>>>>>>>> issue can be mitigated by creating topic with partitions
>> that
>> > are
>> > >>>>>>>>>>> several
>> > >>>>>>>>>>> times the number of consumers. And there will be no
>> imbalance
>> > if
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>> partition number is always doubled. So this imbalance seems
>> > >>>>>>>>>>> acceptable.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Regarding storing the partition strategy as per-topic
>> config:
>> > It
>> > >>>>>>>>>>> seems
>> > >>>>>>>>>>> not
>> > >>>>>>>>>>> necessary since we can still use murmur_hash as the default
>> > hash
>> > >>>>>>>>>>> function
>> > >>>>>>>>>>> and additionally apply the linear hashing algorithm if the
>> > >>>>>>>>>>> partition
>> > >>>>>>>>>>> number
>> > >>>>>>>>>>> has increased. Not sure if there is any use-case for
>> producer
>> > to
>> > >>>>>>>>>>> use a
>> > >>>>>>>>>>> different hash function. Jason, can you check if there is
>> some
>> > >>>>>>>>>>> use-case
>> > >>>>>>>>>>> that I missed for using the per-topic partition strategy?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Regarding how to reduce latency (due to state store/load) in
>> > >>>>>>>>>>> stream
>> > >>>>>>>>>>> processing consumer when partition number changes: I need to
>> > read
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>> Kafka
>> > >>>>>>>>>>> Stream code to understand how Kafka Stream currently migrate
>> > >>>>>>>>>>> state
>> > >>>>>>>>>>> between
>> > >>>>>>>>>>> consumers when the application is added/removed for a given
>> > job.
>> > >>>>>>>>>>> I
>> > >>>>>>>>>>> will
>> > >>>>>>>>>>> reply after I finish reading the documentation and code.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks,
>> > >>>>>>>>>>> Dong
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
>> > >>>>>>>>>>> ja...@confluent.io>
>> > >>>>>>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Great discussion. I think I'm wondering whether we can
>> continue
>> > >>>>>>>>>>> to
>> > >>>>>>>>>>> leave
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Kafka agnostic to the partitioning strategy. The challenge
>> is
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> communicating
>> > >>>>>>>>>>>> the partitioning logic from producers to consumers so that
>> the
>> > >>>>>>>>>>>> dependencies
>> > >>>>>>>>>>>> between each epoch can be determined. For the sake of
>> > >>>>>>>>>>>> discussion,
>> > >>>>>>>>>>>> imagine
>> > >>>>>>>>>>>> you did something like the following:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> 1. The name (and perhaps version) of a partitioning
>> strategy
>> > is
>> > >>>>>>>>>>>> stored
>> > >>>>>>>>>>>> in
>> > >>>>>>>>>>>> topic configuration when a topic is created.
>> > >>>>>>>>>>>> 2. The producer looks up the partitioning strategy before
>> > >>>>>>>>>>>> writing
>> > >>>>>>>>>>>> to
>> > >>>>>>>>>>>> a
>> > >>>>>>>>>>>> topic and includes it in the produce request (for
>> fencing). If
>> > >>>>>>>>>>>> it
>> > >>>>>>>>>>>> doesn't
>> > >>>>>>>>>>>> have an implementation for the configured strategy, it
>> fails.
>> > >>>>>>>>>>>> 3. The consumer also looks up the partitioning strategy and
>> > >>>>>>>>>>>> uses it
>> > >>>>>>>>>>>> to
>> > >>>>>>>>>>>> determine dependencies when reading a new epoch. It could
>> > either
>> > >>>>>>>>>>>> fail
>> > >>>>>>>>>>>> or
>> > >>>>>>>>>>>> make the most conservative dependency assumptions if it
>> > doesn't
>> > >>>>>>>>>>>> know
>> > >>>>>>>>>>>> how
>> > >>>>>>>>>>>> to
>> > >>>>>>>>>>>> implement the partitioning strategy. For the consumer, the
>> new
>> > >>>>>>>>>>>> interface
>> > >>>>>>>>>>>> might look something like this:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> // Return the partition dependencies following an epoch
>> bump
>> > >>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int
>> > >>>>>>>>>>>> numPartitionsBeforeEpochBump,
>> > >>>>>>>>>>>> int numPartitionsAfterEpochBump)
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> The unordered case then is just a particular implementation
>> > >>>>>>>>>>>> which
>> > >>>>>>>>>>>> never
>> > >>>>>>>>>>>> has
>> > >>>>>>>>>>>> any epoch dependencies. To implement this, we would need
>> some
>> > >>>>>>>>>>>> way
>> > >>>>>>>>>>>> for
>> > >>>>>>>>>>>> the
>> > >>>>>>>>>>>> consumer to find out how many partitions there were in each
>> > >>>>>>>>>>>> epoch,
>> > >>>>>>>>>>>> but
>> > >>>>>>>>>>>> maybe that's not too unreasonable.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>> Jason
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
>> > >>>>>>>>>>>> jan.filip...@trivago.com
>> > >>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Hi Dong
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> thank you very much for your questions.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> regarding the time spend copying data across:
>> > >>>>>>>>>>>>> It is correct that copying data from a topic with one
>> > partition
>> > >>>>>>>>>>>>> mapping
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> a topic with a different partition mapping takes way
>> longer
>> > >>>>>>>>>>>>> than
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> we
>> > >>>>>>>>>>>> can
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> stop producers. Tens of minutes is a very optimistic
>> estimate
>> > >>>>>>>>>>>> here.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> Many
>> > >>>>>>>>>>>>> people can not afford copy full steam and therefore will
>> have
>> > >>>>>>>>>>>>> some
>> > >>>>>>>>>>>>> rate
>> > >>>>>>>>>>>>> limiting in place, this can bump the timespan into the
>> day's.
>> > >>>>>>>>>>>>> The
>> > >>>>>>>>>>>>> good
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> part
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> is that the vast majority of the data can be copied while
>> the
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> producers
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> are
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> still going. One can then, piggyback the consumers ontop
>> of
>> > >>>>>>>>>>>>> this
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> timeframe,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> by the method mentioned (provide them an mapping from
>> their
>> > old
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> offsets
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> to
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> new offsets in their repartitioned topics. In that way we
>> > >>>>>>>>>>>>> separate
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> migration of consumers from migration of producers
>> (decoupling
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> these
>> > >>>>>>>>>>>>> is
>> > >>>>>>>>>>>>> what kafka is strongest at). The time to actually swap
>> over
>> > the
>> > >>>>>>>>>>>>> producers
>> > >>>>>>>>>>>>> should be kept minimal by ensuring that when a swap
>> attempt
>> > is
>> > >>>>>>>>>>>>> started
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> consumer copying over should be very close to the log end
>> and
>> > >>>>>>>>>>>>> is
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> expected
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> to finish within the next fetch. The operation should
>> have a
>> > >>>>>>>>>>>>> time-out
>> > >>>>>>>>>>>>> and
>> > >>>>>>>>>>>>> should be "reattemtable".
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Importance of logcompaction:
>> > >>>>>>>>>>>>> If a producer produces key A, to partiton 0, its forever
>> > gonna
>> > >>>>>>>>>>>>> be
>> > >>>>>>>>>>>>> there,
>> > >>>>>>>>>>>>> unless it gets deleted. The record might sit in there for
>> > >>>>>>>>>>>>> years. A
>> > >>>>>>>>>>>>> new
>> > >>>>>>>>>>>>> producer started with the new partitions will fail to
>> delete
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>> record
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> the correct partition. Th record will be there forever and
>> > one
>> > >>>>>>>>>>>>> can
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> not
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how linear
>> > >>>>>>>>>>>> hashing
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> can
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> solve
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> this.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> Regarding your skipping of userland copying:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> 100%, copying the data across in userland is, as far as i
>> can
>> > >>>>>>>>>>>>> see,
>> > >>>>>>>>>>>>> only
>> > >>>>>>>>>>>>> a
>> > >>>>>>>>>>>>> usecase for log compacted topics. Even for logcompaction +
>> > >>>>>>>>>>>>> retentions
>> > >>>>>>>>>>>>> it
>> > >>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I think log
>> > >>>>>>>>>>>>> compaction
>> > >>>>>>>>>>>>> is
>> > >>>>>>>>>>>>> a
>> > >>>>>>>>>>>>> very important feature to really embrace kafka as a "data
>> > >>>>>>>>>>>>> plattform".
>> > >>>>>>>>>>>>> The
>> > >>>>>>>>>>>>> point I also want to make is that copying data this way is
>> > >>>>>>>>>>>>> completely
>> > >>>>>>>>>>>>> inline with the kafka architecture. it only consists of
>> > reading
>> > >>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> writing
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> to topics.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>> I hope it clarifies more why I think we should aim for more
>> > than
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>> current KIP. I fear that once the KIP is done not much
>> more
>> > >>>>>>>>>>>>> effort
>> > >>>>>>>>>>>>> will
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> be
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> taken.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Hey Jan,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> In the current proposal, the consumer will be blocked on
>> > >>>>>>>>>>>>> waiting
>> > >>>>>>>>>>>>> for
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> other
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> consumers of the group to consume up to a given offset.
>> In
>> > >>>>>>>>>>>>>> most
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>> cases,
>> > >>>>>>>>>>>>> all
>> > >>>>>>>>>>>>> consumers should be close to the LEO of the partitions
>> when
>> > the
>> > >>>>>>>>>>>>> partition
>> > >>>>>>>>>>>>> expansion happens. Thus the time waiting should not be
>> long
>> > >>>>>>>>>>>>> e.g.
>> > >>>>>>>>>>>>> on
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> order of seconds. On the other hand, it may take a long
>> time
>> > to
>> > >>>>>>>>>>>>> wait
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> for
>> > >>>>>>>>>>>>>> the entire partition to be copied -- the amount of time
>> is
>> > >>>>>>>>>>>>>> proportional
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> the amount of existing data in the partition, which can
>> take
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>> tens of
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> minutes. So the amount of time that we stop consumers may
>> not
>> > >>>>>>>>>>>>> be
>> > >>>>>>>>>>>>> on
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>> same order of magnitude.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> If we can implement this suggestion without copying data
>> > over
>> > >>>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>> purse
>> > >>>>>>>>>>>>>> userland, it will be much more valuable. Do you have
>> ideas
>> > on
>> > >>>>>>>>>>>>>> how
>> > >>>>>>>>>>>>>> this
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> can
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> be done?
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Not sure why the current KIP not help people who depend on
>> > log
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> compaction.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Could you elaborate more on this point?
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Dong
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan
>> > >>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>> > >>>>>>>>>>>>>> com
>> > >>>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Hi Dong,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> I tried to focus on what the steps are one can currently
>> > >>>>>>>>>>>>>> perform
>> > >>>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> expand
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top notch
>> > >>>>>>>>>>>>>>> semantics.
>> > >>>>>>>>>>>>>>> I can understand that there might be confusion about
>> > >>>>>>>>>>>>>>> "stopping
>> > >>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>> consumer". It is exactly the same as proposed in the
>> KIP.
>> > >>>>>>>>>>>>>>> there
>> > >>>>>>>>>>>>>>> needs
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> be
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> a time the producers agree on the new partitioning. The
>> > extra
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> semantics I
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> want to put in there is that we have a possibility to
>> wait
>> > >>>>>>>>>>>>>>> until
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> all
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> existing data
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> is copied over into the new partitioning scheme. When I
>> say
>> > >>>>>>>>>>>>> stopping
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> think more of having a memory barrier that ensures the
>> > >>>>>>>>>>>>>>> ordering. I
>> > >>>>>>>>>>>>>>> am
>> > >>>>>>>>>>>>>>> still
>> > >>>>>>>>>>>>>>> aming for latencies  on the scale of leader failovers.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Consumers have to explicitly adapt the new partitioning
>> > >>>>>>>>>>>>>>> scheme
>> > >>>>>>>>>>>>>>> in
>> > >>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>> above scenario. The reason is that in these cases where
>> you
>> > >>>>>>>>>>>>>>> are
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> dependent
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> on a particular partitioning scheme, you also have other
>> > >>>>>>>>>>>>>>> topics
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> that
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> have
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> co-partition enforcements or the kind -frequently.
>> Therefore
>> > >>>>>>>>>>>>>> all
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> your
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> other
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> input topics might need to grow accordingly.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> What I was suggesting was to streamline all these
>> > operations
>> > >>>>>>>>>>>>>>> as
>> > >>>>>>>>>>>>>>> best
>> > >>>>>>>>>>>>>>> as
>> > >>>>>>>>>>>>>>> possible to have "real" partition grow and shrinkage
>> going
>> > >>>>>>>>>>>>>>> on.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Migrating
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> the producers to a new partitioning scheme can be much
>> more
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> streamlined
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> with proper broker support for this. Migrating consumer
>> is a
>> > >>>>>>>>>>>>> step
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> that
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> might be made completly unnecessary if - for example
>> > streams
>> > >>>>>>>>>>>>>>> -
>> > >>>>>>>>>>>>>>> takes
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1.
>> > >>>>>>>>>>>>>>> Connect
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> consumers
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> and other consumers should be fine anyways.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> I hope this makes more clear where I was aiming at. The
>> rest
>> > >>>>>>>>>>>>> needs
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> be
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> figured out. The only danger i see is that when we are
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> introducing
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> this
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> feature as supposed in the KIP, it wont help any people
>> > >>>>>>>>>>>>> depending
>> > >>>>>>>>>>>>> on
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> log
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> compaction.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> The other thing I wanted to mention is that I believe the
>> > >>>>>>>>>>>>> current
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> suggestion (without copying data over) can be implemented
>> in
>> > >>>>>>>>>>>>>> pure
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> userland
>> > >>>>>>>>>>>>>>> with a custom partitioner and a small feedbackloop from
>> > >>>>>>>>>>>>>>> ProduceResponse
>> > >>>>>>>>>>>>>>> =>
>> > >>>>>>>>>>>>>>> Partitionier in coorporation with a change management
>> > system.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Best Jan
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Hey Jan,
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> I am not sure if it is acceptable for producer to be
>> > stopped
>> > >>>>>>>>>>>>>>> for a
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> while,
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> particularly for online application which requires low
>> > >>>>>>>>>>>>>>>> latency. I
>> > >>>>>>>>>>>>>>>> am
>> > >>>>>>>>>>>>>>>> also
>> > >>>>>>>>>>>>>>>> not sure how consumers can switch to a new topic. Does
>> > user
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> application
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> needs to explicitly specify a different topic for
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> producer/consumer
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>> subscribe to? It will be helpful for discussion if you can
>> > >>>>>>>>>>>>> provide
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> more
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> detail on the interface change for this solution.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Dong
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>> > >>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> com
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Hi,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> just want to throw my though in. In general the
>> > functionality
>> > >>>>>>>>>>>>>> is
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> very
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> usefull, we should though not try to find the
>> architecture
>> > >>>>>>>>>>>>>>>> to
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> hard
>> > >>>>>>>>>>>>>>>>> while
>> > >>>>>>>>>>>>>>>>> implementing.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> The manual steps would be to
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> create a new topic
>> > >>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the new topic
>> > >>>>>>>>>>>>>>>>> wait for mirror making to catch up.
>> > >>>>>>>>>>>>>>>>> then put the consumers onto the new topic
>> > >>>>>>>>>>>>>>>>>            (having mirrormaker spit out a mapping from
>> > old
>> > >>>>>>>>>>>>>>>>> offsets to
>> > >>>>>>>>>>>>>>>>> new
>> > >>>>>>>>>>>>>>>>> offsets:
>> > >>>>>>>>>>>>>>>>>                if topic is increased by factor X
>> there is
>> > >>>>>>>>>>>>>>>>> gonna
>> > >>>>>>>>>>>>>>>>> be a
>> > >>>>>>>>>>>>>>>>> clean
>> > >>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in
>> > the
>> > >>>>>>>>>>>>>>>>> new
>> > >>>>>>>>>>>>>>>>> topic,
>> > >>>>>>>>>>>>>>>>>                if there is no factor then there is no
>> > >>>>>>>>>>>>>>>>> chance to
>> > >>>>>>>>>>>>>>>>> generate a
>> > >>>>>>>>>>>>>>>>> mapping that can be reasonable used for continuing)
>> > >>>>>>>>>>>>>>>>>            make consumers stop at appropriate points
>> and
>> > >>>>>>>>>>>>>>>>> continue
>> > >>>>>>>>>>>>>>>>> consumption
>> > >>>>>>>>>>>>>>>>> with offsets from the mapping.
>> > >>>>>>>>>>>>>>>>> have the producers stop for a minimal time.
>> > >>>>>>>>>>>>>>>>> wait for mirrormaker to finish
>> > >>>>>>>>>>>>>>>>> let producer produce with the new metadata.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Instead of implementing the approach suggest in the
>> KIP
>> > >>>>>>>>>>>>>>>>> which
>> > >>>>>>>>>>>>>>>>> will
>> > >>>>>>>>>>>>>>>>> leave
>> > >>>>>>>>>>>>>>>>> log compacted topic completely crumbled and unusable.
>> > >>>>>>>>>>>>>>>>> I would much rather try to build infrastructure to
>> > support
>> > >>>>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>>> mentioned
>> > >>>>>>>>>>>>>>>>> above operations more smoothly.
>> > >>>>>>>>>>>>>>>>> Especially having producers stop and use another
>> topic is
>> > >>>>>>>>>>>>>>>>> difficult
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid metadata"
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> exceptions
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> for
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> them
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> if one could give topics aliases so that their produces
>> > with
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>>> old
>> > >>>>>>>>>>>>>>>>> topic
>> > >>>>>>>>>>>>>>>>> will arrive in the new topic.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having the same
>> data
>> > >>>>>>>>>>>>>>>>> twice
>> > >>>>>>>>>>>>>>>>> for
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> transition period, but kafka tends to scale well with
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> datasize).
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> So
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> its a
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> nicer fit into the architecture.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> I further want to argument that the functionality by the
>> > KIP
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> can
>> > >>>>>>>>>>>>>>>>> completely be implementing in "userland" with a custom
>> > >>>>>>>>>>>>>>>>> partitioner
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> that
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> handles the transition as needed. I would appreciate
>> if
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> someone
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> could
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> point
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> out what a custom partitioner couldn't handle in this
>> case?
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> With the above approach, shrinking a topic becomes the
>> same
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> steps.
>> > >>>>>>>>>>>>>>>>> Without
>> > >>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Would love to hear what everyone thinks.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Best Jan
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Hi all,
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order message
>> delivery
>> > >>>>>>>>>>>>>>>>> with
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> partition
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> expansion. See
>> > >>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl
>> > >>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253%
>> > >>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de
>> > >>>>>>>>>>>>>>>>>> livery+with+partition+expansio
>> > >>>>>>>>>>>>>>>>>> n
>> > >>>>>>>>>>>>>>>>>> .
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of the same
>> > key
>> > >>>>>>>>>>>>>>>>>> from
>> > >>>>>>>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>>>> same
>> > >>>>>>>>>>>>>>>>>> producer to be consumed in the same order they are
>> > >>>>>>>>>>>>>>>>>> produced
>> > >>>>>>>>>>>>>>>>>> even
>> > >>>>>>>>>>>>>>>>>> if
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> we
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> expand partition of the topic.
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Dong
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Reply via email to