Thanks Jun, That time also works for me.
-john

On Wed, Apr 4, 2018 at 6:28 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> @Jun, yeah that works for me too.
>
> @Jan, just to clarify on my previous email: assuming we do the reshuffling
> out of the user input topics, Streams has the advantage that the
> repartition topic is purely owned by itself: the only producer writing to
> this repartition topic is Streams itself; so then we can change the split
> the number of partitions of the repartition without requiring to expand the
> input topics. And since the reshuffling task itself is stateless, it does
> not require key partitioning preservation. So as I mentioned, it is indeed
> not dependent on KIP-253 itself as KIP-254 is primarily for expanding an
> input topic for consumer applications.
>
>
> Guozhang
>
> On Wed, Apr 4, 2018 at 4:01 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Thanks Jun! The time works for me.
> >
> >
> > On Thu, 5 Apr 2018 at 4:34 AM Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Jan, Dong, John, Guozhang,
> > >
> > > Perhaps it will be useful to have a KIP meeting to discuss this
> together
> > as
> > > a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send
> out
> > > an invite to the mailing list.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com
> >
> > > wrote:
> > >
> > > > Want to quickly step in here again because it is going places again.
> > > >
> > > > The last part of the discussion is just a pain to read and completely
> > > > diverged from what I suggested without making the reasons clear to
> me.
> > > >
> > > > I don't know why this happens.... here are my comments anyway.
> > > >
> > > > @Guozhang: That Streams is working on automatic creating
> > > > copartition-usuable topics: great for streams, has literally nothing
> > todo
> > > > with the KIP as we want to grow the
> > > > input topic. Everyone can reshuffle rel. easily but that is not what
> we
> > > > need todo, we need to grow the topic in question. After streams
> > > > automatically reshuffled the input topic
> > > > still has the same size and it didn't help a bit. I fail to see why
> > this
> > > > is relevant. What am i missing here?
> > > >
> > > > @Dong
> > > > I am still on the position that the current proposal brings us into
> the
> > > > wrong direction. Especially introducing PartitionKeyRebalanceListener
> > > > From this point we can never move away to proper state full handling
> > > > without completely deprecating this creature from hell again.
> > > > Linear hashing is not the optimising step we have todo here. An
> > interface
> > > > that when a topic is a topic its always the same even after it had
> > > > grown or shrunk is important. So from my POV I have major concerns
> that
> > > > this KIP is benefitial in its current state.
> > > >
> > > > What is it that makes everyone so addicted to the idea of linear
> > hashing?
> > > > not attractive at all for me.
> > > > And with statefull consumers still a complete mess. Why not stick
> with
> > > the
> > > > Kappa architecture???
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 03.04.2018 17:38, Dong Lin wrote:
> > > >
> > > >> Hey John,
> > > >>
> > > >> Thanks much for your comments!!
> > > >>
> > > >> I have yet to go through the emails of John/Jun/Guozhang in detail.
> > But
> > > >> let
> > > >> me present my idea for how to minimize the delay for state loading
> for
> > > >> stream use-case.
> > > >>
> > > >> For ease of understanding, let's assume that the initial partition
> > > number
> > > >> of input topics and change log topic are both 10. And initial number
> > of
> > > >> stream processor is also 10. If we only increase initial partition
> > > number
> > > >> of input topics to 15 without changing number of stream processor,
> the
> > > >> current KIP already guarantees in-order delivery and no state needs
> to
> > > be
> > > >> moved between consumers for stream use-case. Next, let's say we want
> > to
> > > >> increase the number of processor to expand the processing capacity
> for
> > > >> stream use-case. This requires us to move state between processors
> > which
> > > >> will take time. Our goal is to minimize the impact (i.e. delay) for
> > > >> processing while we increase the number of processors.
> > > >>
> > > >> Note that stream processor generally includes both consumer and
> > > producer.
> > > >> In addition to consume from the input topic, consumer may also need
> to
> > > >> consume from change log topic on startup for recovery. And producer
> > may
> > > >> produce state to the change log topic.
> > > >>
> > > >>
> > > > The solution will include the following steps:
> > > >>
> > > >> 1) Increase partition number of the input topic from 10 to 15. Since
> > the
> > > >> messages with the same key will still go to the same consume before
> > and
> > > >> after the partition expansion, this step can be done without having
> to
> > > >> move
> > > >> state between processors.
> > > >>
> > > >> 2) Increase partition number of the change log topic from 10 to 15.
> > Note
> > > >> that this step can also be done without impacting existing workflow.
> > > After
> > > >> we increase partition number of the change log topic, key space may
> > > split
> > > >> and some key will be produced to the newly-added partition. But the
> > same
> > > >> key will still go to the same processor (i.e. consumer) before and
> > after
> > > >> the partition. Thus this step can also be done without having to
> move
> > > >> state
> > > >> between processors.
> > > >>
> > > >> 3) Now, let's add 5 new consumers whose groupId is different from
> the
> > > >> existing processor's groupId. Thus these new consumers will not
> impact
> > > >> existing workflow. Each of these new consumers should consume two
> > > >> partitions from the earliest offset, where these two partitions are
> > the
> > > >> same partitions that will be consumed if the consumers have the same
> > > >> groupId as the existing processor's groupId. For example, the first
> of
> > > the
> > > >> five consumers will consume partition 0 and partition 10. The
> purpose
> > of
> > > >> these consumers is to rebuild the state (e.g. RocksDB) for the
> > > processors
> > > >> in advance. Also note that, by design of the current KIP, each
> consume
> > > >> will
> > > >> consume the existing partition of the change log topic up to the
> > offset
> > > >> before the partition expansion. Then they will only need to consume
> > the
> > > >> state of the new partition of the change log topic.
> > > >>
> > > >> 4) After consumers have caught up in step 3), we should stop these
> > > >> consumers and add 5 new processors to the stream processing job.
> > These 5
> > > >> new processors should run in the same location as the previous 5
> > > consumers
> > > >> to re-use the state (e.g. RocksDB). And these processors' consumers
> > > should
> > > >> consume partitions of the change log topic from the committed offset
> > the
> > > >> previous 5 consumers so that no state is missed.
> > > >>
> > > >> One important trick to note here is that, the mapping from partition
> > to
> > > >> consumer should also use linear hashing. And we need to remember the
> > > >> initial number of processors in the job, 10 in this example, and use
> > > this
> > > >> number in the linear hashing algorithm. This is pretty much the same
> > as
> > > >> how
> > > >> we use linear hashing to map key to partition. In this case, we get
> an
> > > >> identity map from partition -> processor, for both input topic and
> the
> > > >> change log topic. For example, processor 12 will consume partition
> 12
> > of
> > > >> the input topic and produce state to the partition 12 of the change
> > log
> > > >> topic.
> > > >>
> > > >> There are a few important properties of this solution to note:
> > > >>
> > > >> - We can increase the number of partitions for input topic and the
> > > change
> > > >> log topic in any order asynchronously.
> > > >> - The expansion of the processors in a given job in step 4) only
> > > requires
> > > >> the step 3) for the same job. It does not require coordination
> across
> > > >> different jobs for step 3) and 4). Thus different jobs can
> > independently
> > > >> expand there capacity without waiting for each other.
> > > >> - The logic for 1) and 2) is already supported in the current KIP.
> The
> > > >> logic for 3) and 4) appears to be independent of the core Kafka
> logic
> > > and
> > > >> can be implemented separately outside core Kafka. Thus the current
> KIP
> > > is
> > > >> probably sufficient after we agree on the efficiency and the
> > correctness
> > > >> of
> > > >> the solution. We can have a separate KIP for Kafka Stream to support
> > 3)
> > > >> and
> > > >> 4).
> > > >>
> > > >>
> > > >> Cheers,
> > > >> Dong
> > > >>
> > > >>
> > > >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >>
> > > >> Hey guys, just sharing my two cents here (I promise it will be
> shorter
> > > >>> than
> > > >>> John's article :).
> > > >>>
> > > >>> 0. Just to quickly recap, the main discussion point now is how to
> > > support
> > > >>> "key partitioning preservation" (John's #4 in topic characteristics
> > > >>> above)
> > > >>> beyond the "single-key ordering preservation" that KIP-253 was
> > > originally
> > > >>> proposed to maintain (John's #6 above).
> > > >>>
> > > >>> 1. From the streams project, we are actively working on improving
> the
> > > >>> elastic scalability of the library. One of the key features is to
> > > >>> decouple
> > > >>> the input topics from the parallelism model of Streams: i.e. not
> > > >>> enforcing
> > > >>> the topic to be partitioned by the key, not enforcing joining
> topics
> > to
> > > >>> be
> > > >>> co-partitioned, not relying the number of parallel tasks on the
> input
> > > >>> topic
> > > >>> partitions. This can be achieved by re-shuffling on the input
> topics
> > to
> > > >>> make sure key-partitioning / co-partitioning on the internal
> topics.
> > > Note
> > > >>> the re-shuffling task is purely stateless and hence does not
> require
> > > "key
> > > >>> partitioning preservation". Operational-wise it is similar to the
> > > >>> "creating
> > > >>> a new topic with new number of partitions, pipe the data to the new
> > > topic
> > > >>> and cut over consumers from old topics" idea, just that users can
> > > >>> optionally let Streams to handle such rather than doing it manually
> > > >>> themselves. There are a few more details on that regard but I will
> > skip
> > > >>> since they are not directly related to this discussion.
> > > >>>
> > > >>> 2. Assuming that 1) above is done, then the only topics involved in
> > the
> > > >>> scaling events are all input topics. For these topics the only
> > > producers
> > > >>> /
> > > >>> consumers of these topics are controlled by Streams clients
> > themselves,
> > > >>> and
> > > >>> hence achieving "key partitioning preservation" is simpler than
> > > >>> non-Streams
> > > >>> scenarios: consumers know the partitioning scheme that producers
> are
> > > >>> using,
> > > >>> so that for their stateful operations it is doable to split the
> local
> > > >>> state
> > > >>> stores accordingly or execute backfilling on its own. Of course, if
> > we
> > > >>> decide to do server-side backfilling, it can still help Streams to
> > > >>> directly
> > > >>> rely on that functionality.
> > > >>>
> > > >>> 3. As John mentioned, another way inside Streams is to do
> > > >>> over-partitioning
> > > >>> on all internal topics; then with 1) Streams would not rely on
> > KIP-253
> > > at
> > > >>> all. But personally I'd like to avoid it if possible to reduce
> Kafka
> > > side
> > > >>> footprint: say we overpartition each input topic up to 1k, with a
> > > >>> reasonable sized stateful topology it can still contribute to tens
> of
> > > >>> thousands of topics to the topic partition capacity of a single
> > > cluster.
> > > >>>
> > > >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams
> > users
> > > >>> writing their stateful computations with local states, and think
> > > whether
> > > >>> /
> > > >>> how we could enable "key partitioning preservation" for them
> easily,
> > > than
> > > >>> to think heavily for Streams library. People may have different
> > opinion
> > > >>> on
> > > >>> how common of a usage pattern it is (I think Jun might be
> suggesting
> > > that
> > > >>> for DIY apps people may more likely use remote states so that it is
> > > not a
> > > >>> problem for them). My opinion is that for non-Streams users such
> > usage
> > > >>> pattern could still be large (think: if you are piping data from
> > Kafka
> > > to
> > > >>> an external data storage which has single-writer requirements for
> > each
> > > >>> single shard, even though it is not a stateful computational
> > > application
> > > >>> it
> > > >>> may still require "key partitioning preservation"), so I prefer to
> > have
> > > >>> backfilling in our KIP than only exposing the API for expansion and
> > > >>> requires consumers to have pre-knowledge of the producer's
> > partitioning
> > > >>> scheme.
> > > >>>
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io>
> > > wrote:
> > > >>>
> > > >>> Hey Dong,
> > > >>>>
> > > >>>> Congrats on becoming a committer!!!
> > > >>>>
> > > >>>> Since I just sent a novel-length email, I'll try and keep this one
> > > brief
> > > >>>>
> > > >>> ;)
> > > >>>
> > > >>>> Regarding producer coordination, I'll grant that in that case,
> > > producers
> > > >>>> may coordinate among themselves to produce into the same topic or
> to
> > > >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka
> > > >>>> ecosystem
> > > >>>> in general requires such coordination for correctness or in fact
> for
> > > any
> > > >>>> optional features, though, so I would not say that we require
> > producer
> > > >>>> coordination of partition logic. If producers currently
> coordinate,
> > > it's
> > > >>>> completely optional and their own choice.
> > > >>>>
> > > >>>> Regarding the portability of partition algorithms, my observation
> is
> > > >>>> that
> > > >>>> systems requiring independent implementations of the same
> algorithm
> > > with
> > > >>>> 100% correctness are a large source of risk and also a burden on
> > those
> > > >>>>
> > > >>> who
> > > >>>
> > > >>>> have to maintain them. If people could flawlessly implement
> > algorithms
> > > >>>> in
> > > >>>> actual software, the world would be a wonderful place indeed! For
> a
> > > >>>>
> > > >>> system
> > > >>>
> > > >>>> as important and widespread as Kafka, I would recommend
> restricting
> > > >>>> limiting such requirements as aggressively as possible.
> > > >>>>
> > > >>>> I'd agree that we can always revisit decisions like allowing
> > arbitrary
> > > >>>> partition functions, but of course, we shouldn't do that in a
> > vacuum.
> > > >>>>
> > > >>> That
> > > >>>
> > > >>>> feels like the kind of thing we'd need to proactively seek
> guidance
> > > from
> > > >>>> the users list about. I do think that the general approach of
> saying
> > > >>>> that
> > > >>>> "if you use a custom partitioner, you cannot do partition
> expansion"
> > > is
> > > >>>> very reasonable (but I don't think we need to go that far with the
> > > >>>>
> > > >>> current
> > > >>>
> > > >>>> proposal). It's similar to my statement in my email to Jun that in
> > > >>>> principle KStreams doesn't *need* backfill, we only need it if we
> > want
> > > >>>> to
> > > >>>> employ partition expansion.
> > > >>>>
> > > >>>> I reckon that the main motivation for backfill is to support
> > KStreams
> > > >>>> use
> > > >>>> cases and also any other use cases involving stateful consumers.
> > > >>>>
> > > >>>> Thanks for your response, and congrats again!
> > > >>>> -John
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>> Hey John,
> > > >>>>>
> > > >>>>> Great! Thanks for all the comment. It seems that we agree that
> the
> > > >>>>>
> > > >>>> current
> > > >>>>
> > > >>>>> KIP is in good shape for core Kafka. IMO, what we have been
> > > discussing
> > > >>>>>
> > > >>>> in
> > > >>>
> > > >>>> the recent email exchanges is mostly about the second step, i.e.
> how
> > > to
> > > >>>>> address problem for the stream use-case (or stateful processing
> in
> > > >>>>> general).
> > > >>>>>
> > > >>>>> I will comment inline.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io
> >
> > > >>>>>
> > > >>>> wrote:
> > > >>>
> > > >>>> Thanks for the response, Dong.
> > > >>>>>>
> > > >>>>>> Here are my answers to your questions:
> > > >>>>>>
> > > >>>>>> - "Asking producers and consumers, or even two different
> > producers,
> > > >>>>>>
> > > >>>>> to
> > > >>>
> > > >>>> share code like the partition function is a pretty huge ask. What
> > > >>>>>>>
> > > >>>>>> if
> > > >>>
> > > >>>> they
> > > >>>>>
> > > >>>>>> are using different languages?". It seems that today we already
> > > >>>>>>>
> > > >>>>>> require
> > > >>>>
> > > >>>>> different producer's to use the same hash function -- otherwise
> > > >>>>>>>
> > > >>>>>> messages
> > > >>>>>
> > > >>>>>> with the same key will go to different partitions of the same
> > topic
> > > >>>>>>>
> > > >>>>>> which
> > > >>>>>
> > > >>>>>> may cause problem for downstream consumption. So not sure if it
> > > >>>>>>>
> > > >>>>>> adds
> > > >>>
> > > >>>> any
> > > >>>>>
> > > >>>>>> more constraint by assuming consumers know the hash function of
> > > >>>>>>>
> > > >>>>>> producer.
> > > >>>>>
> > > >>>>>> Could you explain more why user would want to use a cusmtom
> > > >>>>>>>
> > > >>>>>> partition
> > > >>>
> > > >>>> function? Maybe we can check if this is something that can be
> > > >>>>>>>
> > > >>>>>> supported
> > > >>>>
> > > >>>>> in
> > > >>>>>>
> > > >>>>>>> the default Kafka hash function. Also, can you explain more why
> > it
> > > >>>>>>>
> > > >>>>>> is
> > > >>>
> > > >>>> difficuilt to implement the same hash function in different
> > > >>>>>>>
> > > >>>>>> languages?
> > > >>>>
> > > >>>>>
> > > >>>>>> Sorry, I meant two different producers as in producers to two
> > > >>>>>>
> > > >>>>> different
> > > >>>
> > > >>>> topics. This was in response to the suggestion that we already
> > > >>>>>>
> > > >>>>> require
> > > >>>
> > > >>>> coordination among producers to different topics in order to
> achieve
> > > >>>>>> co-partitioning. I was saying that we do not (and should not).
> > > >>>>>>
> > > >>>>>
> > > >>>>> It is probably common for producers of different team to produce
> > > >>>>>
> > > >>>> message
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> the same topic. In order to ensure that messages with the same
> key
> > go
> > > >>>>>
> > > >>>> to
> > > >>>
> > > >>>> same partition, we need producers of different team to share the
> > same
> > > >>>>> partition algorithm, which by definition requires coordination
> > among
> > > >>>>> producers of different teams in an organization. Even for
> producers
> > > of
> > > >>>>> different topics, it may be common to require producers to use
> the
> > > same
> > > >>>>> partition algorithm in order to join two topics for stream
> > > processing.
> > > >>>>>
> > > >>>> Does
> > > >>>>
> > > >>>>> this make it reasonable to say we already require coordination
> > across
> > > >>>>> producers?
> > > >>>>>
> > > >>>>>
> > > >>>>> By design, consumers are currently ignorant of the partitioning
> > > >>>>>>
> > > >>>>> scheme.
> > > >>>
> > > >>>> It
> > > >>>>>
> > > >>>>>> suffices to trust that the producer has partitioned the topic by
> > > key,
> > > >>>>>>
> > > >>>>> if
> > > >>>>
> > > >>>>> they claim to have done so. If you don't trust that, or even if
> you
> > > >>>>>>
> > > >>>>> just
> > > >>>>
> > > >>>>> need some other partitioning scheme, then you must re-partition
> it
> > > >>>>>> yourself. Nothing we're discussing can or should change that.
> The
> > > >>>>>>
> > > >>>>> value
> > > >>>
> > > >>>> of
> > > >>>>>
> > > >>>>>> backfill is that it preserves the ability for consumers to avoid
> > > >>>>>> re-partitioning before consuming, in the case where they don't
> > need
> > > >>>>>>
> > > >>>>> to
> > > >>>
> > > >>>> today.
> > > >>>>>>
> > > >>>>>
> > > >>>>> Regarding shared "hash functions", note that it's a bit
> inaccurate
> > to
> > > >>>>>>
> > > >>>>> talk
> > > >>>>>
> > > >>>>>> about the "hash function" of the producer. Properly speaking,
> the
> > > >>>>>>
> > > >>>>> producer
> > > >>>>>
> > > >>>>>> has only a "partition function". We do not know that it is a
> hash.
> > > >>>>>>
> > > >>>>> The
> > > >>>
> > > >>>> producer can use any method at their disposal to assign a
> partition
> > > >>>>>>
> > > >>>>> to
> > > >>>
> > > >>>> a
> > > >>>>
> > > >>>>> record. The partition function obviously may we written in any
> > > >>>>>>
> > > >>>>> programming
> > > >>>>>
> > > >>>>>> language, so in general it's not something that can be shared
> > around
> > > >>>>>> without a formal spec or the ability to execute arbitrary
> > > executables
> > > >>>>>>
> > > >>>>> in
> > > >>>>
> > > >>>>> arbitrary runtime environments.
> > > >>>>>>
> > > >>>>>> Yeah it is probably better to say partition algorithm. I guess
> it
> > > >>>>>
> > > >>>> should
> > > >>>
> > > >>>> not be difficult to implement same partition algorithms in
> different
> > > >>>>> languages, right? Yes we would need a formal specification of the
> > > >>>>>
> > > >>>> default
> > > >>>
> > > >>>> partition algorithm in the producer. I think that can be
> documented
> > as
> > > >>>>>
> > > >>>> part
> > > >>>>
> > > >>>>> of the producer interface.
> > > >>>>>
> > > >>>>>
> > > >>>>> Why would a producer want a custom partition function? I don't
> > > >>>>>>
> > > >>>>> know...
> > > >>>
> > > >>>> why
> > > >>>>>
> > > >>>>>> did we design the interface so that our users can provide one?
> In
> > > >>>>>>
> > > >>>>> general,
> > > >>>>>
> > > >>>>>> such systems provide custom partitioners because some data sets
> > may
> > > >>>>>>
> > > >>>>> be
> > > >>>
> > > >>>> unbalanced under the default or because they can provide some
> > > >>>>>>
> > > >>>>> interesting
> > > >>>>
> > > >>>>> functionality built on top of the partitioning scheme, etc.
> Having
> > > >>>>>>
> > > >>>>> provided
> > > >>>>>
> > > >>>>>> this ability, I don't know why we would remove it.
> > > >>>>>>
> > > >>>>>> Yeah it is reasonable to assume that there was reason to support
> > > >>>>> custom
> > > >>>>> partition function in producer. On the other hand it may also be
> > > >>>>>
> > > >>>> reasonable
> > > >>>>
> > > >>>>> to revisit this interface and discuss whether we actually need to
> > > >>>>>
> > > >>>> support
> > > >>>
> > > >>>> custom partition function. If we don't have a good reason, we can
> > > >>>>>
> > > >>>> choose
> > > >>>
> > > >>>> not to support custom partition function in this KIP in a backward
> > > >>>>> compatible manner, i.e. user can still use custom partition
> > function
> > > >>>>>
> > > >>>> but
> > > >>>
> > > >>>> they would not get the benefit of in-order delivery when there is
> > > >>>>>
> > > >>>> partition
> > > >>>>
> > > >>>>> expansion. What do you think?
> > > >>>>>
> > > >>>>>
> > > >>>>> - Besides the assumption that consumer needs to share the hash
> > > >>>>>>
> > > >>>>> function
> > > >>>
> > > >>>> of
> > > >>>>>
> > > >>>>>> producer, is there other organization overhead of the proposal
> in
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> current KIP?
> > > >>>>>>>
> > > >>>>>>> It wasn't clear to me that KIP-253 currently required the
> > producer
> > > >>>>>>
> > > >>>>> and
> > > >>>
> > > >>>> consumer to share the partition function, or in fact that it had a
> > > >>>>>>
> > > >>>>> hard
> > > >>>
> > > >>>> requirement to abandon the general partition function and use a
> > > >>>>>>
> > > >>>>> linear
> > > >>>
> > > >>>> hash
> > > >>>>>
> > > >>>>>> function instead.
> > > >>>>>>
> > > >>>>>
> > > >>>>> In my reading, there is a requirement to track the metadata about
> > > >>>>>>
> > > >>>>> what
> > > >>>
> > > >>>> partitions split into what other partitions during an expansion
> > > >>>>>>
> > > >>>>> operation.
> > > >>>>>
> > > >>>>>> If the partition function is linear, this is easy. If not, you
> can
> > > >>>>>>
> > > >>>>> always
> > > >>>>
> > > >>>>> just record that all old partitions split into all new
> partitions.
> > > >>>>>>
> > > >>>>> This
> > > >>>
> > > >>>> has
> > > >>>>>
> > > >>>>>> the effect of forcing all consumers to wait until the old epoch
> is
> > > >>>>>> completely consumed before starting on the new epoch. But this
> may
> > > >>>>>>
> > > >>>>> be a
> > > >>>
> > > >>>> reasonable tradeoff, and it doesn't otherwise alter your design.
> > > >>>>>>
> > > >>>>>> You only mention the consumer needing to know that the partition
> > > >>>>>>
> > > >>>>> function
> > > >>>>
> > > >>>>> is linear, not what the actual function is, so I don't think your
> > > >>>>>>
> > > >>>>> design
> > > >>>>
> > > >>>>> actually calls for sharing the function. Plus, really all the
> > > >>>>>>
> > > >>>>> consumer
> > > >>>
> > > >>>> needs is the metadata about what old-epoch partitions to wait for
> > > >>>>>>
> > > >>>>> before
> > > >>>>
> > > >>>>> consuming a new-epoch partition. This information is directly
> > > >>>>>>
> > > >>>>> captured
> > > >>>
> > > >>>> in
> > > >>>>
> > > >>>>> metadata, so I don't think it actually even cares whether the
> > > >>>>>>
> > > >>>>> partition
> > > >>>
> > > >>>> function is linear or not.
> > > >>>>>>
> > > >>>>>> You are right that the current KIP does not mention it. My
> comment
> > > >>>>>
> > > >>>> related
> > > >>>>
> > > >>>>> to the partition function coordination was related to support the
> > > >>>>> stream-use case which we have been discussing so far.
> > > >>>>>
> > > >>>>>
> > > >>>>> So, no, I really think KIP-253 is in good shape. I was really
> more
> > > >>>>>>
> > > >>>>> talking
> > > >>>>>
> > > >>>>>> about the part of this thread that's outside of KIP-253's scope,
> > > >>>>>>
> > > >>>>> namely,
> > > >>>>
> > > >>>>> creating the possibility of backfilling partitions after
> expansion.
> > > >>>>>>
> > > >>>>>> Great! Can you also confirm that the main motivation for
> > backfilling
> > > >>>>> partitions after expansion is to support the stream use-case?
> > > >>>>>
> > > >>>>>
> > > >>>>> - Currently producer can forget about the message that has been
> > > >>>>>>
> > > >>>>>>> acknowledged by the broker. Thus the producer probably does not
> > > >>>>>>>
> > > >>>>>> know
> > > >>>
> > > >>>> most
> > > >>>>>
> > > >>>>>> of the exiting messages in topic, including those messages
> > produced
> > > >>>>>>>
> > > >>>>>> by
> > > >>>>
> > > >>>>> other producers. We can have the owner of the producer to
> > > >>>>>>>
> > > >>>>>> split+backfill.
> > > >>>>>
> > > >>>>>> In my opion it will be a new program that wraps around the
> > existing
> > > >>>>>>> producer and consumer classes.
> > > >>>>>>>
> > > >>>>>>> This sounds fine by me!
> > > >>>>>>
> > > >>>>>> Really, I was just emphasizing that the part of the organization
> > > that
> > > >>>>>> produces a topic shouldn't have to export their partition
> function
> > > to
> > > >>>>>>
> > > >>>>> the
> > > >>>>
> > > >>>>> part(s) of the organization (or other organizations) that consume
> > the
> > > >>>>>> topic. Whether the backfill operation goes into the Producer
> > > >>>>>>
> > > >>>>> interface
> > > >>>
> > > >>>> is
> > > >>>>
> > > >>>>> secondary, I think.
> > > >>>>>>
> > > >>>>>> - Regarding point 5. The argument is in favor of the
> > split+backfill
> > > >>>>>>
> > > >>>>> but
> > > >>>
> > > >>>> for
> > > >>>>>
> > > >>>>>> changelog topic. And it intends to address the problem for
> stream
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>>>
> > > >>>>>>> in general. In this KIP we will provide interface (i.e.
> > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>
> > > >>>>> and
> > > >>>>>>
> > > >>>>>>> the goal is that user can flush/re-consume the state as part of
> > the
> > > >>>>>>> interface implementation regardless of whether there is change
> > log
> > > >>>>>>>
> > > >>>>>> topic.
> > > >>>>>
> > > >>>>>> Maybe you are suggesting that the main reason to do
> split+backfill
> > > >>>>>>>
> > > >>>>>> of
> > > >>>
> > > >>>> input
> > > >>>>>>
> > > >>>>>>> topic is to support log compacted topics? You mentioned in
> Point
> > 1
> > > >>>>>>>
> > > >>>>>> that
> > > >>>>
> > > >>>>> log
> > > >>>>>>
> > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
> > > >>>>>>>
> > > >>>>>> understand
> > > >>>>>>
> > > >>>>>>> your position better. Regarding Jan's proposal to split
> > partitions
> > > >>>>>>>
> > > >>>>>> with
> > > >>>>
> > > >>>>> backfill, do you think this should replace the proposal in the
> > > >>>>>>>
> > > >>>>>> existing
> > > >>>>
> > > >>>>> KIP, or do you think this is something that we should do in
> > > >>>>>>>
> > > >>>>>> addition
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> existing KIP?
> > > >>>>>>>
> > > >>>>>>> I think that interface is a good/necessary component of
> KIP-253.
> > > >>>>>>
> > > >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped,
> > but I
> > > >>>>>>
> > > >>>>> do
> > > >>>
> > > >>>> think its utility will be limited unless there is a later KIP
> > > >>>>>>
> > > >>>>> offering
> > > >>>
> > > >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and
> > > >>>>>>
> > > >>>>> tackle
> > > >>>>
> > > >>>>> the ordering problem independently of backfill, so I'm in support
> > of
> > > >>>>>>
> > > >>>>> the
> > > >>>>
> > > >>>>> current KIP.
> > > >>>>>>
> > > >>>>>> - Regarding point 6. I guess we can agree that it is better not
> to
> > > >>>>>>
> > > >>>>> have
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> performance overhread of copying the input data. Before we
> discuss
> > > >>>>>>>
> > > >>>>>> more
> > > >>>>
> > > >>>>> on
> > > >>>>>>
> > > >>>>>>> whether the performance overhead is acceptable or not, I am
> > trying
> > > >>>>>>>
> > > >>>>>> to
> > > >>>
> > > >>>> figure out what is the benefit of introducing this overhread. You
> > > >>>>>>>
> > > >>>>>> mentioned
> > > >>>>>>
> > > >>>>>>> that the benefit is the loose organizational coupling. By
> > > >>>>>>>
> > > >>>>>> "organizational
> > > >>>>>
> > > >>>>>> coupling", are you referring to the requirement that consumer
> > needs
> > > >>>>>>>
> > > >>>>>> to
> > > >>>>
> > > >>>>> know
> > > >>>>>>
> > > >>>>>>> the hash function of producer? If so, maybe we can discuss the
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>
> > > >>>>> of
> > > >>>>>>
> > > >>>>>>> custom partiton function and see whether we can find a way to
> > > >>>>>>>
> > > >>>>>> support
> > > >>>
> > > >>>> such
> > > >>>>>>
> > > >>>>>>> use-case without having to copy the input data.
> > > >>>>>>>
> > > >>>>>>> I'm not too sure about what an "input" is in this sense, since
> we
> > > are
> > > >>>>>>
> > > >>>>> just
> > > >>>>>
> > > >>>>>> talking about topics. Actually the point I was making there is
> > that
> > > >>>>>>
> > > >>>>> AKAICT
> > > >>>>>
> > > >>>>>> the performance overhead of a backfill is less than any other
> > > option,
> > > >>>>>> assuming you split partitions rarely.
> > > >>>>>>
> > > >>>>>> By "input" I was referring to source Kafka topic of a stream
> > > >>>>> processing
> > > >>>>> job.
> > > >>>>>
> > > >>>>>
> > > >>>>> Separately, yes, "organizational coupling" increases if producers
> > and
> > > >>>>>> consumers have to share code, such as the partition function.
> This
> > > >>>>>>
> > > >>>>> would
> > > >>>>
> > > >>>>> not be the case if producers could only pick from a menu of a few
> > > >>>>>> well-known partition functions, but I think this is a poor
> > tradeoff.
> > > >>>>>>
> > > >>>>>> Maybe we can revisit the custom partition function and see
> whether
> > > we
> > > >>>>> actually need it? Otherwise, I am concerned that every user will
> > pay
> > > >>>>>
> > > >>>> the
> > > >>>
> > > >>>> overhead of data movement to support something that was not really
> > > >>>>>
> > > >>>> needed
> > > >>>
> > > >>>> for most users.
> > > >>>>>
> > > >>>>>
> > > >>>>> To me, this is two strong arguments in favor of backfill being
> less
> > > >>>>>> expensive than no backfill, but again, I think that particular
> > > debate
> > > >>>>>>
> > > >>>>> comes
> > > >>>>>
> > > >>>>>> after KIP-253, so I don't want to create the impression of
> > > opposition
> > > >>>>>>
> > > >>>>> to
> > > >>>>
> > > >>>>> your proposal.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Finally, to respond to a new email I just noticed:
> > > >>>>>>
> > > >>>>>> BTW, here is my understanding of the scope of this KIP. We want
> to
> > > >>>>>>>
> > > >>>>>> allow
> > > >>>>>
> > > >>>>>> consumers to always consume messages with the same key from the
> > > >>>>>>>
> > > >>>>>> same
> > > >>>
> > > >>>> producer in the order they are produced. And we need to provide a
> > > >>>>>>>
> > > >>>>>> way
> > > >>>
> > > >>>> for
> > > >>>>>
> > > >>>>>> stream use-case to be able to flush/load state when messages
> with
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> same
> > > >>>>>>
> > > >>>>>>> key are migrated between consumers. In addition to ensuring
> that
> > > >>>>>>>
> > > >>>>>> this
> > > >>>
> > > >>>> goal
> > > >>>>>>
> > > >>>>>>> is correctly supported, we should do our best to keep the
> > > >>>>>>>
> > > >>>>>> performance
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> organization overhead of this KIP as low as possible.
> > > >>>>>>>
> > > >>>>>>> I think we're on the same page there! In fact, I would
> > generalize a
> > > >>>>>>
> > > >>>>> little
> > > >>>>>
> > > >>>>>> more and say that the mechanism you've designed provides *all
> > > >>>>>>
> > > >>>>> consumers*
> > > >>>>
> > > >>>>> the ability "to flush/load state when messages with the same key
> > are
> > > >>>>>> migrated between consumers", not just Streams.
> > > >>>>>>
> > > >>>>>> Thanks for all the comment!
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks for the discussion,
> > > >>>>>> -John
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
> > > >>>>>>
> > > >>>>> wrote:
> > > >>>
> > > >>>> Hey John,
> > > >>>>>>>
> > > >>>>>>> Thanks much for the detailed comments. Here are my thoughts:
> > > >>>>>>>
> > > >>>>>>> - The need to delete messages from log compacted topics is
> mainly
> > > >>>>>>>
> > > >>>>>> for
> > > >>>
> > > >>>> performance (e.g. storage space) optimization than for correctness
> > > >>>>>>>
> > > >>>>>> for
> > > >>>>
> > > >>>>> this
> > > >>>>>>
> > > >>>>>>> KIP. I agree that we probably don't need to focus on this in
> our
> > > >>>>>>>
> > > >>>>>> discussion
> > > >>>>>>
> > > >>>>>>> since it is mostly for performance optimization.
> > > >>>>>>>
> > > >>>>>>> - "Asking producers and consumers, or even two different
> > producers,
> > > >>>>>>>
> > > >>>>>> to
> > > >>>>
> > > >>>>> share code like the partition function is a pretty huge ask. What
> > > >>>>>>>
> > > >>>>>> if
> > > >>>
> > > >>>> they
> > > >>>>>
> > > >>>>>> are using different languages?". It seems that today we already
> > > >>>>>>>
> > > >>>>>> require
> > > >>>>
> > > >>>>> different producer's to use the same hash function -- otherwise
> > > >>>>>>>
> > > >>>>>> messages
> > > >>>>>
> > > >>>>>> with the same key will go to different partitions of the same
> > topic
> > > >>>>>>>
> > > >>>>>> which
> > > >>>>>
> > > >>>>>> may cause problem for downstream consumption. So not sure if it
> > > >>>>>>>
> > > >>>>>> adds
> > > >>>
> > > >>>> any
> > > >>>>>
> > > >>>>>> more constraint by assuming consumers know the hash function of
> > > >>>>>>>
> > > >>>>>> producer.
> > > >>>>>
> > > >>>>>> Could you explain more why user would want to use a cusmtom
> > > >>>>>>>
> > > >>>>>> partition
> > > >>>
> > > >>>> function? Maybe we can check if this is something that can be
> > > >>>>>>>
> > > >>>>>> supported
> > > >>>>
> > > >>>>> in
> > > >>>>>>
> > > >>>>>>> the default Kafka hash function. Also, can you explain more why
> > it
> > > >>>>>>>
> > > >>>>>> is
> > > >>>
> > > >>>> difficuilt to implement the same hash function in different
> > > >>>>>>>
> > > >>>>>> languages?
> > > >>>>
> > > >>>>> - Besides the assumption that consumer needs to share the hash
> > > >>>>>>>
> > > >>>>>> function
> > > >>>>
> > > >>>>> of
> > > >>>>>>
> > > >>>>>>> producer, is there other organization overhead of the proposal
> in
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> current KIP?
> > > >>>>>>>
> > > >>>>>>> - Currently producer can forget about the message that has been
> > > >>>>>>> acknowledged by the broker. Thus the producer probably does not
> > > >>>>>>>
> > > >>>>>> know
> > > >>>
> > > >>>> most
> > > >>>>>
> > > >>>>>> of the exiting messages in topic, including those messages
> > produced
> > > >>>>>>>
> > > >>>>>> by
> > > >>>>
> > > >>>>> other producers. We can have the owner of the producer to
> > > >>>>>>>
> > > >>>>>> split+backfill.
> > > >>>>>
> > > >>>>>> In my opion it will be a new program that wraps around the
> > existing
> > > >>>>>>> producer and consumer classes.
> > > >>>>>>>
> > > >>>>>>> - Regarding point 5. The argument is in favor of the
> > split+backfill
> > > >>>>>>>
> > > >>>>>> but
> > > >>>>
> > > >>>>> for
> > > >>>>>>
> > > >>>>>>> changelog topic. And it intends to address the problem for
> stream
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>>>
> > > >>>>>>> in general. In this KIP we will provide interface (i.e.
> > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>
> > > >>>>> and
> > > >>>>>>
> > > >>>>>>> the goal is that user can flush/re-consume the state as part of
> > the
> > > >>>>>>> interface implementation regardless of whether there is change
> > log
> > > >>>>>>>
> > > >>>>>> topic.
> > > >>>>>
> > > >>>>>> Maybe you are suggesting that the main reason to do
> split+backfill
> > > >>>>>>>
> > > >>>>>> of
> > > >>>
> > > >>>> input
> > > >>>>>>
> > > >>>>>>> topic is to support log compacted topics? You mentioned in
> Point
> > 1
> > > >>>>>>>
> > > >>>>>> that
> > > >>>>
> > > >>>>> log
> > > >>>>>>
> > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
> > > >>>>>>>
> > > >>>>>> understand
> > > >>>>>>
> > > >>>>>>> your position better. Regarding Jan's proposal to split
> > partitions
> > > >>>>>>>
> > > >>>>>> with
> > > >>>>
> > > >>>>> backfill, do you think this should replace the proposal in the
> > > >>>>>>>
> > > >>>>>> existing
> > > >>>>
> > > >>>>> KIP, or do you think this is something that we should do in
> > > >>>>>>>
> > > >>>>>> addition
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> existing KIP?
> > > >>>>>>>
> > > >>>>>>> - Regarding point 6. I guess we can agree that it is better not
> > to
> > > >>>>>>>
> > > >>>>>> have
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> performance overhread of copying the input data. Before we
> > discuss
> > > >>>>>>>
> > > >>>>>> more
> > > >>>>
> > > >>>>> on
> > > >>>>>>
> > > >>>>>>> whether the performance overhead is acceptable or not, I am
> > trying
> > > >>>>>>>
> > > >>>>>> to
> > > >>>
> > > >>>> figure out what is the benefit of introducing this overhread. You
> > > >>>>>>>
> > > >>>>>> mentioned
> > > >>>>>>
> > > >>>>>>> that the benefit is the loose organizational coupling. By
> > > >>>>>>>
> > > >>>>>> "organizational
> > > >>>>>
> > > >>>>>> coupling", are you referring to the requirement that consumer
> > needs
> > > >>>>>>>
> > > >>>>>> to
> > > >>>>
> > > >>>>> know
> > > >>>>>>
> > > >>>>>>> the hash function of producer? If so, maybe we can discuss the
> > > >>>>>>>
> > > >>>>>> use-case
> > > >>>>
> > > >>>>> of
> > > >>>>>>
> > > >>>>>>> custom partiton function and see whether we can find a way to
> > > >>>>>>>
> > > >>>>>> support
> > > >>>
> > > >>>> such
> > > >>>>>>
> > > >>>>>>> use-case without having to copy the input data.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Dong
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <
> > j...@confluent.io>
> > > >>>>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hey Dong and Jun,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll
> mix
> > > >>>>>>>>
> > > >>>>>>> my
> > > >>>
> > > >>>> replies
> > > >>>>>>>
> > > >>>>>>>> together to try for a coherent response. I'm not too familiar
> > > >>>>>>>>
> > > >>>>>>> with
> > > >>>
> > > >>>> mailing-list etiquette, though.
> > > >>>>>>>>
> > > >>>>>>>> I'm going to keep numbering my points because it makes it easy
> > > >>>>>>>>
> > > >>>>>>> for
> > > >>>
> > > >>>> you
> > > >>>>>
> > > >>>>>> all
> > > >>>>>>>
> > > >>>>>>>> to respond.
> > > >>>>>>>>
> > > >>>>>>>> Point 1:
> > > >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the
> > > >>>>>>>>
> > > >>>>>>> producers
> > > >>>>
> > > >>>>> and
> > > >>>>>>
> > > >>>>>>> consumers so that you preserve the correct ordering of records
> > > >>>>>>>>
> > > >>>>>>> during
> > > >>>>
> > > >>>>> partition expansion. This is clearly necessary regardless of
> > > >>>>>>>>
> > > >>>>>>> anything
> > > >>>>
> > > >>>>> else
> > > >>>>>>>
> > > >>>>>>>> we discuss. I think this whole discussion about backfill,
> > > >>>>>>>>
> > > >>>>>>> consumers,
> > > >>>>
> > > >>>>> streams, etc., is beyond the scope of KIP-253. But it would be
> > > >>>>>>>>
> > > >>>>>>> cumbersome
> > > >>>>>>
> > > >>>>>>> to start a new thread at this point.
> > > >>>>>>>>
> > > >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the
> > > >>>>>>>>
> > > >>>>>>> details...
> > > >>>
> > > >>>> I
> > > >>>>
> > > >>>>> think
> > > >>>>>>>
> > > >>>>>>>> this is a nice addition to the proposal. One thought is that
> > it's
> > > >>>>>>>>
> > > >>>>>>> actually
> > > >>>>>>>
> > > >>>>>>>> irrelevant whether the hash function is linear. This is simply
> > an
> > > >>>>>>>>
> > > >>>>>>> algorithm
> > > >>>>>>>
> > > >>>>>>>> for moving a key from one partition to another, so the type of
> > > >>>>>>>>
> > > >>>>>>> hash
> > > >>>
> > > >>>> function need not be a precondition. In fact, it also doesn't
> > > >>>>>>>>
> > > >>>>>>> matter
> > > >>>>
> > > >>>>> whether the topic is compacted or not, the algorithm works
> > > >>>>>>>>
> > > >>>>>>> regardless.
> > > >>>>>
> > > >>>>>> I think this is a good algorithm to keep in mind, as it might
> > > >>>>>>>>
> > > >>>>>>> solve a
> > > >>>>
> > > >>>>> variety of problems, but it does have a downside: that the
> > > >>>>>>>>
> > > >>>>>>> producer
> > > >>>
> > > >>>> won't
> > > >>>>>>
> > > >>>>>>> know whether or not K1 was actually in P1, it just knows that
> K1
> > > >>>>>>>>
> > > >>>>>>> was
> > > >>>>
> > > >>>>> in
> > > >>>>>
> > > >>>>>> P1's keyspace before the new epoch. Therefore, it will have to
> > > >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the
> next
> > > >>>>>>>>
> > > >>>>>>> time
> > > >>>>
> > > >>>>> K1
> > > >>>>>
> > > >>>>>> comes along, the producer *also* won't remember that it already
> > > >>>>>>>>
> > > >>>>>>> retracted
> > > >>>>>>
> > > >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
> > > >>>>>>>>
> > > >>>>>>> extension,
> > > >>>>
> > > >>>>> every
> > > >>>>>>>
> > > >>>>>>>> time the producer sends to P2, it will also have to send a
> > > >>>>>>>>
> > > >>>>>>> tombstone
> > > >>>>
> > > >>>>> to
> > > >>>>>
> > > >>>>>> P1,
> > > >>>>>>>
> > > >>>>>>>> which is a pretty big burden. To make the situation worse, if
> > > >>>>>>>>
> > > >>>>>>> there
> > > >>>
> > > >>>> is
> > > >>>>>
> > > >>>>>> a
> > > >>>>>>
> > > >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx
> belonging
> > > >>>>>>>>
> > > >>>>>>> to
> > > >>>>
> > > >>>>> P3
> > > >>>>>
> > > >>>>>> will also have to be retracted from P2 *and* P1, since the
> > > >>>>>>>>
> > > >>>>>>> producer
> > > >>>
> > > >>>> can't
> > > >>>>>>
> > > >>>>>>> know whether Kx had been last written to P2 or P1. Over a long
> > > >>>>>>>>
> > > >>>>>>> period
> > > >>>>
> > > >>>>> of
> > > >>>>>>
> > > >>>>>>> time, this clearly becomes a issue, as the producer must send
> an
> > > >>>>>>>>
> > > >>>>>>> arbitrary
> > > >>>>>>>
> > > >>>>>>>> number of retractions along with every update.
> > > >>>>>>>>
> > > >>>>>>>> In contrast, the proposed backfill operation has an end, and
> > > >>>>>>>>
> > > >>>>>>> after
> > > >>>
> > > >>>> it
> > > >>>>
> > > >>>>> ends,
> > > >>>>>>>
> > > >>>>>>>> everyone can afford to forget that there ever was a different
> > > >>>>>>>>
> > > >>>>>>> partition
> > > >>>>>
> > > >>>>>> layout.
> > > >>>>>>>>
> > > >>>>>>>> Really, though, figuring out how to split compacted topics is
> > > >>>>>>>>
> > > >>>>>>> beyond
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
> > > >>>>>>>>
> > > >>>>>>> this
> > > >>>>
> > > >>>>> KIP...
> > > >>>>>>>
> > > >>>>>>>> We do need in-order delivery during partition expansion. It
> > would
> > > >>>>>>>>
> > > >>>>>>> be
> > > >>>>
> > > >>>>> fine
> > > >>>>>>
> > > >>>>>>> by me to say that you *cannot* expand partitions of a
> > > >>>>>>>>
> > > >>>>>>> log-compacted
> > > >>>
> > > >>>> topic
> > > >>>>>>
> > > >>>>>>> and call it a day. I think it would be better to tackle that in
> > > >>>>>>>>
> > > >>>>>>> another
> > > >>>>>
> > > >>>>>> KIP.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Point 2:
> > > >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
> > > >>>>>>>>
> > > >>>>>>> always
> > > >>>>
> > > >>>>> on
> > > >>>>>>
> > > >>>>>>> the table; any consumer who wants to re-shuffle its input is
> free
> > > >>>>>>>>
> > > >>>>>>> to
> > > >>>>
> > > >>>>> do
> > > >>>>>
> > > >>>>>> so.
> > > >>>>>>>
> > > >>>>>>>> But this is currently not required. It's just that the current
> > > >>>>>>>>
> > > >>>>>>> high-level
> > > >>>>>>
> > > >>>>>>> story with Kafka encourages the use of partitions as a unit of
> > > >>>>>>>>
> > > >>>>>>> concurrency.
> > > >>>>>>>
> > > >>>>>>>> As long as consumers are single-threaded, they can happily
> > > >>>>>>>>
> > > >>>>>>> consume
> > > >>>
> > > >>>> a
> > > >>>>
> > > >>>>> single
> > > >>>>>>>
> > > >>>>>>>> partition without concurrency control of any kind. This is a
> key
> > > >>>>>>>>
> > > >>>>>>> aspect
> > > >>>>>
> > > >>>>>> to
> > > >>>>>>>
> > > >>>>>>>> this system that lets folks design high-throughput systems on
> > top
> > > >>>>>>>>
> > > >>>>>>> of
> > > >>>>
> > > >>>>> it
> > > >>>>>
> > > >>>>>> surprisingly easily. If all consumers were instead
> > > >>>>>>>>
> > > >>>>>>> encouraged/required
> > > >>>>>
> > > >>>>>> to
> > > >>>>>>
> > > >>>>>>> implement a repartition of their own, then the consumer becomes
> > > >>>>>>>> significantly more complex, requiring either the consumer to
> > > >>>>>>>>
> > > >>>>>>> first
> > > >>>
> > > >>>> produce
> > > >>>>>>>
> > > >>>>>>>> to its own intermediate repartition topic or to ensure that
> > > >>>>>>>>
> > > >>>>>>> consumer
> > > >>>>
> > > >>>>> threads have a reliable, high-bandwith channel of communication
> > > >>>>>>>>
> > > >>>>>>> with
> > > >>>>
> > > >>>>> every
> > > >>>>>>>
> > > >>>>>>>> other consumer thread.
> > > >>>>>>>>
> > > >>>>>>>> Either of those tradeoffs may be reasonable for a particular
> > user
> > > >>>>>>>>
> > > >>>>>>> of
> > > >>>>
> > > >>>>> Kafka,
> > > >>>>>>>
> > > >>>>>>>> but I don't know if we're in a position to say that they are
> > > >>>>>>>>
> > > >>>>>>> reasonable
> > > >>>>>
> > > >>>>>> for
> > > >>>>>>>
> > > >>>>>>>> *every* user of Kafka.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Point 3:
> > > >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and
> > > >>>>>>>>
> > > >>>>>>> maintaining
> > > >>>>>>
> > > >>>>>>> the
> > > >>>>>>>> states in a local store", I agree that they may use a
> framework
> > > >>>>>>>>
> > > >>>>>>> *like*
> > > >>>>>
> > > >>>>>> Kafka Streams, but that is not the same as using Kafka Streams.
> > > >>>>>>>>
> > > >>>>>>> This
> > > >>>>
> > > >>>>> is
> > > >>>>>
> > > >>>>>> why
> > > >>>>>>>
> > > >>>>>>>> I think it's better to solve it in Core: because it is then
> > > >>>>>>>>
> > > >>>>>>> solved
> > > >>>
> > > >>>> for
> > > >>>>>
> > > >>>>>> KStreams and also for everything else that facilitates local
> > > >>>>>>>>
> > > >>>>>>> state
> > > >>>
> > > >>>> maintenance. To me, Streams is a member of the category of
> > > >>>>>>>>
> > > >>>>>>> "stream
> > > >>>
> > > >>>> processing frameworks", which is itself a subcategory of "things
> > > >>>>>>>>
> > > >>>>>>> requiring
> > > >>>>>>>
> > > >>>>>>>> local state maintenence". I'm not sure if it makes sense to
> > > >>>>>>>
> > > >>>>>>>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to