Thanks Jun, That time also works for me. -john On Wed, Apr 4, 2018 at 6:28 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> @Jun, yeah that works for me too. > > @Jan, just to clarify on my previous email: assuming we do the reshuffling > out of the user input topics, Streams has the advantage that the > repartition topic is purely owned by itself: the only producer writing to > this repartition topic is Streams itself; so then we can change the split > the number of partitions of the repartition without requiring to expand the > input topics. And since the reshuffling task itself is stateless, it does > not require key partitioning preservation. So as I mentioned, it is indeed > not dependent on KIP-253 itself as KIP-254 is primarily for expanding an > input topic for consumer applications. > > > Guozhang > > On Wed, Apr 4, 2018 at 4:01 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Thanks Jun! The time works for me. > > > > > > On Thu, 5 Apr 2018 at 4:34 AM Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Jan, Dong, John, Guozhang, > > > > > > Perhaps it will be useful to have a KIP meeting to discuss this > together > > as > > > a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send > out > > > an invite to the mailing list. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com > > > > > wrote: > > > > > > > Want to quickly step in here again because it is going places again. > > > > > > > > The last part of the discussion is just a pain to read and completely > > > > diverged from what I suggested without making the reasons clear to > me. > > > > > > > > I don't know why this happens.... here are my comments anyway. > > > > > > > > @Guozhang: That Streams is working on automatic creating > > > > copartition-usuable topics: great for streams, has literally nothing > > todo > > > > with the KIP as we want to grow the > > > > input topic. Everyone can reshuffle rel. easily but that is not what > we > > > > need todo, we need to grow the topic in question. After streams > > > > automatically reshuffled the input topic > > > > still has the same size and it didn't help a bit. I fail to see why > > this > > > > is relevant. What am i missing here? > > > > > > > > @Dong > > > > I am still on the position that the current proposal brings us into > the > > > > wrong direction. Especially introducing PartitionKeyRebalanceListener > > > > From this point we can never move away to proper state full handling > > > > without completely deprecating this creature from hell again. > > > > Linear hashing is not the optimising step we have todo here. An > > interface > > > > that when a topic is a topic its always the same even after it had > > > > grown or shrunk is important. So from my POV I have major concerns > that > > > > this KIP is benefitial in its current state. > > > > > > > > What is it that makes everyone so addicted to the idea of linear > > hashing? > > > > not attractive at all for me. > > > > And with statefull consumers still a complete mess. Why not stick > with > > > the > > > > Kappa architecture??? > > > > > > > > > > > > > > > > > > > > > > > > On 03.04.2018 17:38, Dong Lin wrote: > > > > > > > >> Hey John, > > > >> > > > >> Thanks much for your comments!! > > > >> > > > >> I have yet to go through the emails of John/Jun/Guozhang in detail. > > But > > > >> let > > > >> me present my idea for how to minimize the delay for state loading > for > > > >> stream use-case. > > > >> > > > >> For ease of understanding, let's assume that the initial partition > > > number > > > >> of input topics and change log topic are both 10. And initial number > > of > > > >> stream processor is also 10. If we only increase initial partition > > > number > > > >> of input topics to 15 without changing number of stream processor, > the > > > >> current KIP already guarantees in-order delivery and no state needs > to > > > be > > > >> moved between consumers for stream use-case. Next, let's say we want > > to > > > >> increase the number of processor to expand the processing capacity > for > > > >> stream use-case. This requires us to move state between processors > > which > > > >> will take time. Our goal is to minimize the impact (i.e. delay) for > > > >> processing while we increase the number of processors. > > > >> > > > >> Note that stream processor generally includes both consumer and > > > producer. > > > >> In addition to consume from the input topic, consumer may also need > to > > > >> consume from change log topic on startup for recovery. And producer > > may > > > >> produce state to the change log topic. > > > >> > > > >> > > > > The solution will include the following steps: > > > >> > > > >> 1) Increase partition number of the input topic from 10 to 15. Since > > the > > > >> messages with the same key will still go to the same consume before > > and > > > >> after the partition expansion, this step can be done without having > to > > > >> move > > > >> state between processors. > > > >> > > > >> 2) Increase partition number of the change log topic from 10 to 15. > > Note > > > >> that this step can also be done without impacting existing workflow. > > > After > > > >> we increase partition number of the change log topic, key space may > > > split > > > >> and some key will be produced to the newly-added partition. But the > > same > > > >> key will still go to the same processor (i.e. consumer) before and > > after > > > >> the partition. Thus this step can also be done without having to > move > > > >> state > > > >> between processors. > > > >> > > > >> 3) Now, let's add 5 new consumers whose groupId is different from > the > > > >> existing processor's groupId. Thus these new consumers will not > impact > > > >> existing workflow. Each of these new consumers should consume two > > > >> partitions from the earliest offset, where these two partitions are > > the > > > >> same partitions that will be consumed if the consumers have the same > > > >> groupId as the existing processor's groupId. For example, the first > of > > > the > > > >> five consumers will consume partition 0 and partition 10. The > purpose > > of > > > >> these consumers is to rebuild the state (e.g. RocksDB) for the > > > processors > > > >> in advance. Also note that, by design of the current KIP, each > consume > > > >> will > > > >> consume the existing partition of the change log topic up to the > > offset > > > >> before the partition expansion. Then they will only need to consume > > the > > > >> state of the new partition of the change log topic. > > > >> > > > >> 4) After consumers have caught up in step 3), we should stop these > > > >> consumers and add 5 new processors to the stream processing job. > > These 5 > > > >> new processors should run in the same location as the previous 5 > > > consumers > > > >> to re-use the state (e.g. RocksDB). And these processors' consumers > > > should > > > >> consume partitions of the change log topic from the committed offset > > the > > > >> previous 5 consumers so that no state is missed. > > > >> > > > >> One important trick to note here is that, the mapping from partition > > to > > > >> consumer should also use linear hashing. And we need to remember the > > > >> initial number of processors in the job, 10 in this example, and use > > > this > > > >> number in the linear hashing algorithm. This is pretty much the same > > as > > > >> how > > > >> we use linear hashing to map key to partition. In this case, we get > an > > > >> identity map from partition -> processor, for both input topic and > the > > > >> change log topic. For example, processor 12 will consume partition > 12 > > of > > > >> the input topic and produce state to the partition 12 of the change > > log > > > >> topic. > > > >> > > > >> There are a few important properties of this solution to note: > > > >> > > > >> - We can increase the number of partitions for input topic and the > > > change > > > >> log topic in any order asynchronously. > > > >> - The expansion of the processors in a given job in step 4) only > > > requires > > > >> the step 3) for the same job. It does not require coordination > across > > > >> different jobs for step 3) and 4). Thus different jobs can > > independently > > > >> expand there capacity without waiting for each other. > > > >> - The logic for 1) and 2) is already supported in the current KIP. > The > > > >> logic for 3) and 4) appears to be independent of the core Kafka > logic > > > and > > > >> can be implemented separately outside core Kafka. Thus the current > KIP > > > is > > > >> probably sufficient after we agree on the efficiency and the > > correctness > > > >> of > > > >> the solution. We can have a separate KIP for Kafka Stream to support > > 3) > > > >> and > > > >> 4). > > > >> > > > >> > > > >> Cheers, > > > >> Dong > > > >> > > > >> > > > >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > >> > > > >> Hey guys, just sharing my two cents here (I promise it will be > shorter > > > >>> than > > > >>> John's article :). > > > >>> > > > >>> 0. Just to quickly recap, the main discussion point now is how to > > > support > > > >>> "key partitioning preservation" (John's #4 in topic characteristics > > > >>> above) > > > >>> beyond the "single-key ordering preservation" that KIP-253 was > > > originally > > > >>> proposed to maintain (John's #6 above). > > > >>> > > > >>> 1. From the streams project, we are actively working on improving > the > > > >>> elastic scalability of the library. One of the key features is to > > > >>> decouple > > > >>> the input topics from the parallelism model of Streams: i.e. not > > > >>> enforcing > > > >>> the topic to be partitioned by the key, not enforcing joining > topics > > to > > > >>> be > > > >>> co-partitioned, not relying the number of parallel tasks on the > input > > > >>> topic > > > >>> partitions. This can be achieved by re-shuffling on the input > topics > > to > > > >>> make sure key-partitioning / co-partitioning on the internal > topics. > > > Note > > > >>> the re-shuffling task is purely stateless and hence does not > require > > > "key > > > >>> partitioning preservation". Operational-wise it is similar to the > > > >>> "creating > > > >>> a new topic with new number of partitions, pipe the data to the new > > > topic > > > >>> and cut over consumers from old topics" idea, just that users can > > > >>> optionally let Streams to handle such rather than doing it manually > > > >>> themselves. There are a few more details on that regard but I will > > skip > > > >>> since they are not directly related to this discussion. > > > >>> > > > >>> 2. Assuming that 1) above is done, then the only topics involved in > > the > > > >>> scaling events are all input topics. For these topics the only > > > producers > > > >>> / > > > >>> consumers of these topics are controlled by Streams clients > > themselves, > > > >>> and > > > >>> hence achieving "key partitioning preservation" is simpler than > > > >>> non-Streams > > > >>> scenarios: consumers know the partitioning scheme that producers > are > > > >>> using, > > > >>> so that for their stateful operations it is doable to split the > local > > > >>> state > > > >>> stores accordingly or execute backfilling on its own. Of course, if > > we > > > >>> decide to do server-side backfilling, it can still help Streams to > > > >>> directly > > > >>> rely on that functionality. > > > >>> > > > >>> 3. As John mentioned, another way inside Streams is to do > > > >>> over-partitioning > > > >>> on all internal topics; then with 1) Streams would not rely on > > KIP-253 > > > at > > > >>> all. But personally I'd like to avoid it if possible to reduce > Kafka > > > side > > > >>> footprint: say we overpartition each input topic up to 1k, with a > > > >>> reasonable sized stateful topology it can still contribute to tens > of > > > >>> thousands of topics to the topic partition capacity of a single > > > cluster. > > > >>> > > > >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams > > users > > > >>> writing their stateful computations with local states, and think > > > whether > > > >>> / > > > >>> how we could enable "key partitioning preservation" for them > easily, > > > than > > > >>> to think heavily for Streams library. People may have different > > opinion > > > >>> on > > > >>> how common of a usage pattern it is (I think Jun might be > suggesting > > > that > > > >>> for DIY apps people may more likely use remote states so that it is > > > not a > > > >>> problem for them). My opinion is that for non-Streams users such > > usage > > > >>> pattern could still be large (think: if you are piping data from > > Kafka > > > to > > > >>> an external data storage which has single-writer requirements for > > each > > > >>> single shard, even though it is not a stateful computational > > > application > > > >>> it > > > >>> may still require "key partitioning preservation"), so I prefer to > > have > > > >>> backfilling in our KIP than only exposing the API for expansion and > > > >>> requires consumers to have pre-knowledge of the producer's > > partitioning > > > >>> scheme. > > > >>> > > > >>> > > > >>> > > > >>> Guozhang > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> > > > wrote: > > > >>> > > > >>> Hey Dong, > > > >>>> > > > >>>> Congrats on becoming a committer!!! > > > >>>> > > > >>>> Since I just sent a novel-length email, I'll try and keep this one > > > brief > > > >>>> > > > >>> ;) > > > >>> > > > >>>> Regarding producer coordination, I'll grant that in that case, > > > producers > > > >>>> may coordinate among themselves to produce into the same topic or > to > > > >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka > > > >>>> ecosystem > > > >>>> in general requires such coordination for correctness or in fact > for > > > any > > > >>>> optional features, though, so I would not say that we require > > producer > > > >>>> coordination of partition logic. If producers currently > coordinate, > > > it's > > > >>>> completely optional and their own choice. > > > >>>> > > > >>>> Regarding the portability of partition algorithms, my observation > is > > > >>>> that > > > >>>> systems requiring independent implementations of the same > algorithm > > > with > > > >>>> 100% correctness are a large source of risk and also a burden on > > those > > > >>>> > > > >>> who > > > >>> > > > >>>> have to maintain them. If people could flawlessly implement > > algorithms > > > >>>> in > > > >>>> actual software, the world would be a wonderful place indeed! For > a > > > >>>> > > > >>> system > > > >>> > > > >>>> as important and widespread as Kafka, I would recommend > restricting > > > >>>> limiting such requirements as aggressively as possible. > > > >>>> > > > >>>> I'd agree that we can always revisit decisions like allowing > > arbitrary > > > >>>> partition functions, but of course, we shouldn't do that in a > > vacuum. > > > >>>> > > > >>> That > > > >>> > > > >>>> feels like the kind of thing we'd need to proactively seek > guidance > > > from > > > >>>> the users list about. I do think that the general approach of > saying > > > >>>> that > > > >>>> "if you use a custom partitioner, you cannot do partition > expansion" > > > is > > > >>>> very reasonable (but I don't think we need to go that far with the > > > >>>> > > > >>> current > > > >>> > > > >>>> proposal). It's similar to my statement in my email to Jun that in > > > >>>> principle KStreams doesn't *need* backfill, we only need it if we > > want > > > >>>> to > > > >>>> employ partition expansion. > > > >>>> > > > >>>> I reckon that the main motivation for backfill is to support > > KStreams > > > >>>> use > > > >>>> cases and also any other use cases involving stateful consumers. > > > >>>> > > > >>>> Thanks for your response, and congrats again! > > > >>>> -John > > > >>>> > > > >>>> > > > >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > >>>> > > > >>>> Hey John, > > > >>>>> > > > >>>>> Great! Thanks for all the comment. It seems that we agree that > the > > > >>>>> > > > >>>> current > > > >>>> > > > >>>>> KIP is in good shape for core Kafka. IMO, what we have been > > > discussing > > > >>>>> > > > >>>> in > > > >>> > > > >>>> the recent email exchanges is mostly about the second step, i.e. > how > > > to > > > >>>>> address problem for the stream use-case (or stateful processing > in > > > >>>>> general). > > > >>>>> > > > >>>>> I will comment inline. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io > > > > > >>>>> > > > >>>> wrote: > > > >>> > > > >>>> Thanks for the response, Dong. > > > >>>>>> > > > >>>>>> Here are my answers to your questions: > > > >>>>>> > > > >>>>>> - "Asking producers and consumers, or even two different > > producers, > > > >>>>>> > > > >>>>> to > > > >>> > > > >>>> share code like the partition function is a pretty huge ask. What > > > >>>>>>> > > > >>>>>> if > > > >>> > > > >>>> they > > > >>>>> > > > >>>>>> are using different languages?". It seems that today we already > > > >>>>>>> > > > >>>>>> require > > > >>>> > > > >>>>> different producer's to use the same hash function -- otherwise > > > >>>>>>> > > > >>>>>> messages > > > >>>>> > > > >>>>>> with the same key will go to different partitions of the same > > topic > > > >>>>>>> > > > >>>>>> which > > > >>>>> > > > >>>>>> may cause problem for downstream consumption. So not sure if it > > > >>>>>>> > > > >>>>>> adds > > > >>> > > > >>>> any > > > >>>>> > > > >>>>>> more constraint by assuming consumers know the hash function of > > > >>>>>>> > > > >>>>>> producer. > > > >>>>> > > > >>>>>> Could you explain more why user would want to use a cusmtom > > > >>>>>>> > > > >>>>>> partition > > > >>> > > > >>>> function? Maybe we can check if this is something that can be > > > >>>>>>> > > > >>>>>> supported > > > >>>> > > > >>>>> in > > > >>>>>> > > > >>>>>>> the default Kafka hash function. Also, can you explain more why > > it > > > >>>>>>> > > > >>>>>> is > > > >>> > > > >>>> difficuilt to implement the same hash function in different > > > >>>>>>> > > > >>>>>> languages? > > > >>>> > > > >>>>> > > > >>>>>> Sorry, I meant two different producers as in producers to two > > > >>>>>> > > > >>>>> different > > > >>> > > > >>>> topics. This was in response to the suggestion that we already > > > >>>>>> > > > >>>>> require > > > >>> > > > >>>> coordination among producers to different topics in order to > achieve > > > >>>>>> co-partitioning. I was saying that we do not (and should not). > > > >>>>>> > > > >>>>> > > > >>>>> It is probably common for producers of different team to produce > > > >>>>> > > > >>>> message > > > >>> > > > >>>> to > > > >>>> > > > >>>>> the same topic. In order to ensure that messages with the same > key > > go > > > >>>>> > > > >>>> to > > > >>> > > > >>>> same partition, we need producers of different team to share the > > same > > > >>>>> partition algorithm, which by definition requires coordination > > among > > > >>>>> producers of different teams in an organization. Even for > producers > > > of > > > >>>>> different topics, it may be common to require producers to use > the > > > same > > > >>>>> partition algorithm in order to join two topics for stream > > > processing. > > > >>>>> > > > >>>> Does > > > >>>> > > > >>>>> this make it reasonable to say we already require coordination > > across > > > >>>>> producers? > > > >>>>> > > > >>>>> > > > >>>>> By design, consumers are currently ignorant of the partitioning > > > >>>>>> > > > >>>>> scheme. > > > >>> > > > >>>> It > > > >>>>> > > > >>>>>> suffices to trust that the producer has partitioned the topic by > > > key, > > > >>>>>> > > > >>>>> if > > > >>>> > > > >>>>> they claim to have done so. If you don't trust that, or even if > you > > > >>>>>> > > > >>>>> just > > > >>>> > > > >>>>> need some other partitioning scheme, then you must re-partition > it > > > >>>>>> yourself. Nothing we're discussing can or should change that. > The > > > >>>>>> > > > >>>>> value > > > >>> > > > >>>> of > > > >>>>> > > > >>>>>> backfill is that it preserves the ability for consumers to avoid > > > >>>>>> re-partitioning before consuming, in the case where they don't > > need > > > >>>>>> > > > >>>>> to > > > >>> > > > >>>> today. > > > >>>>>> > > > >>>>> > > > >>>>> Regarding shared "hash functions", note that it's a bit > inaccurate > > to > > > >>>>>> > > > >>>>> talk > > > >>>>> > > > >>>>>> about the "hash function" of the producer. Properly speaking, > the > > > >>>>>> > > > >>>>> producer > > > >>>>> > > > >>>>>> has only a "partition function". We do not know that it is a > hash. > > > >>>>>> > > > >>>>> The > > > >>> > > > >>>> producer can use any method at their disposal to assign a > partition > > > >>>>>> > > > >>>>> to > > > >>> > > > >>>> a > > > >>>> > > > >>>>> record. The partition function obviously may we written in any > > > >>>>>> > > > >>>>> programming > > > >>>>> > > > >>>>>> language, so in general it's not something that can be shared > > around > > > >>>>>> without a formal spec or the ability to execute arbitrary > > > executables > > > >>>>>> > > > >>>>> in > > > >>>> > > > >>>>> arbitrary runtime environments. > > > >>>>>> > > > >>>>>> Yeah it is probably better to say partition algorithm. I guess > it > > > >>>>> > > > >>>> should > > > >>> > > > >>>> not be difficult to implement same partition algorithms in > different > > > >>>>> languages, right? Yes we would need a formal specification of the > > > >>>>> > > > >>>> default > > > >>> > > > >>>> partition algorithm in the producer. I think that can be > documented > > as > > > >>>>> > > > >>>> part > > > >>>> > > > >>>>> of the producer interface. > > > >>>>> > > > >>>>> > > > >>>>> Why would a producer want a custom partition function? I don't > > > >>>>>> > > > >>>>> know... > > > >>> > > > >>>> why > > > >>>>> > > > >>>>>> did we design the interface so that our users can provide one? > In > > > >>>>>> > > > >>>>> general, > > > >>>>> > > > >>>>>> such systems provide custom partitioners because some data sets > > may > > > >>>>>> > > > >>>>> be > > > >>> > > > >>>> unbalanced under the default or because they can provide some > > > >>>>>> > > > >>>>> interesting > > > >>>> > > > >>>>> functionality built on top of the partitioning scheme, etc. > Having > > > >>>>>> > > > >>>>> provided > > > >>>>> > > > >>>>>> this ability, I don't know why we would remove it. > > > >>>>>> > > > >>>>>> Yeah it is reasonable to assume that there was reason to support > > > >>>>> custom > > > >>>>> partition function in producer. On the other hand it may also be > > > >>>>> > > > >>>> reasonable > > > >>>> > > > >>>>> to revisit this interface and discuss whether we actually need to > > > >>>>> > > > >>>> support > > > >>> > > > >>>> custom partition function. If we don't have a good reason, we can > > > >>>>> > > > >>>> choose > > > >>> > > > >>>> not to support custom partition function in this KIP in a backward > > > >>>>> compatible manner, i.e. user can still use custom partition > > function > > > >>>>> > > > >>>> but > > > >>> > > > >>>> they would not get the benefit of in-order delivery when there is > > > >>>>> > > > >>>> partition > > > >>>> > > > >>>>> expansion. What do you think? > > > >>>>> > > > >>>>> > > > >>>>> - Besides the assumption that consumer needs to share the hash > > > >>>>>> > > > >>>>> function > > > >>> > > > >>>> of > > > >>>>> > > > >>>>>> producer, is there other organization overhead of the proposal > in > > > >>>>>>> > > > >>>>>> the > > > >>> > > > >>>> current KIP? > > > >>>>>>> > > > >>>>>>> It wasn't clear to me that KIP-253 currently required the > > producer > > > >>>>>> > > > >>>>> and > > > >>> > > > >>>> consumer to share the partition function, or in fact that it had a > > > >>>>>> > > > >>>>> hard > > > >>> > > > >>>> requirement to abandon the general partition function and use a > > > >>>>>> > > > >>>>> linear > > > >>> > > > >>>> hash > > > >>>>> > > > >>>>>> function instead. > > > >>>>>> > > > >>>>> > > > >>>>> In my reading, there is a requirement to track the metadata about > > > >>>>>> > > > >>>>> what > > > >>> > > > >>>> partitions split into what other partitions during an expansion > > > >>>>>> > > > >>>>> operation. > > > >>>>> > > > >>>>>> If the partition function is linear, this is easy. If not, you > can > > > >>>>>> > > > >>>>> always > > > >>>> > > > >>>>> just record that all old partitions split into all new > partitions. > > > >>>>>> > > > >>>>> This > > > >>> > > > >>>> has > > > >>>>> > > > >>>>>> the effect of forcing all consumers to wait until the old epoch > is > > > >>>>>> completely consumed before starting on the new epoch. But this > may > > > >>>>>> > > > >>>>> be a > > > >>> > > > >>>> reasonable tradeoff, and it doesn't otherwise alter your design. > > > >>>>>> > > > >>>>>> You only mention the consumer needing to know that the partition > > > >>>>>> > > > >>>>> function > > > >>>> > > > >>>>> is linear, not what the actual function is, so I don't think your > > > >>>>>> > > > >>>>> design > > > >>>> > > > >>>>> actually calls for sharing the function. Plus, really all the > > > >>>>>> > > > >>>>> consumer > > > >>> > > > >>>> needs is the metadata about what old-epoch partitions to wait for > > > >>>>>> > > > >>>>> before > > > >>>> > > > >>>>> consuming a new-epoch partition. This information is directly > > > >>>>>> > > > >>>>> captured > > > >>> > > > >>>> in > > > >>>> > > > >>>>> metadata, so I don't think it actually even cares whether the > > > >>>>>> > > > >>>>> partition > > > >>> > > > >>>> function is linear or not. > > > >>>>>> > > > >>>>>> You are right that the current KIP does not mention it. My > comment > > > >>>>> > > > >>>> related > > > >>>> > > > >>>>> to the partition function coordination was related to support the > > > >>>>> stream-use case which we have been discussing so far. > > > >>>>> > > > >>>>> > > > >>>>> So, no, I really think KIP-253 is in good shape. I was really > more > > > >>>>>> > > > >>>>> talking > > > >>>>> > > > >>>>>> about the part of this thread that's outside of KIP-253's scope, > > > >>>>>> > > > >>>>> namely, > > > >>>> > > > >>>>> creating the possibility of backfilling partitions after > expansion. > > > >>>>>> > > > >>>>>> Great! Can you also confirm that the main motivation for > > backfilling > > > >>>>> partitions after expansion is to support the stream use-case? > > > >>>>> > > > >>>>> > > > >>>>> - Currently producer can forget about the message that has been > > > >>>>>> > > > >>>>>>> acknowledged by the broker. Thus the producer probably does not > > > >>>>>>> > > > >>>>>> know > > > >>> > > > >>>> most > > > >>>>> > > > >>>>>> of the exiting messages in topic, including those messages > > produced > > > >>>>>>> > > > >>>>>> by > > > >>>> > > > >>>>> other producers. We can have the owner of the producer to > > > >>>>>>> > > > >>>>>> split+backfill. > > > >>>>> > > > >>>>>> In my opion it will be a new program that wraps around the > > existing > > > >>>>>>> producer and consumer classes. > > > >>>>>>> > > > >>>>>>> This sounds fine by me! > > > >>>>>> > > > >>>>>> Really, I was just emphasizing that the part of the organization > > > that > > > >>>>>> produces a topic shouldn't have to export their partition > function > > > to > > > >>>>>> > > > >>>>> the > > > >>>> > > > >>>>> part(s) of the organization (or other organizations) that consume > > the > > > >>>>>> topic. Whether the backfill operation goes into the Producer > > > >>>>>> > > > >>>>> interface > > > >>> > > > >>>> is > > > >>>> > > > >>>>> secondary, I think. > > > >>>>>> > > > >>>>>> - Regarding point 5. The argument is in favor of the > > split+backfill > > > >>>>>> > > > >>>>> but > > > >>> > > > >>>> for > > > >>>>> > > > >>>>>> changelog topic. And it intends to address the problem for > stream > > > >>>>>>> > > > >>>>>> use-case > > > >>>>>> > > > >>>>>>> in general. In this KIP we will provide interface (i.e. > > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > > > >>>>>>> > > > >>>>>> use-case > > > >>>> > > > >>>>> and > > > >>>>>> > > > >>>>>>> the goal is that user can flush/re-consume the state as part of > > the > > > >>>>>>> interface implementation regardless of whether there is change > > log > > > >>>>>>> > > > >>>>>> topic. > > > >>>>> > > > >>>>>> Maybe you are suggesting that the main reason to do > split+backfill > > > >>>>>>> > > > >>>>>> of > > > >>> > > > >>>> input > > > >>>>>> > > > >>>>>>> topic is to support log compacted topics? You mentioned in > Point > > 1 > > > >>>>>>> > > > >>>>>> that > > > >>>> > > > >>>>> log > > > >>>>>> > > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > > > >>>>>>> > > > >>>>>> understand > > > >>>>>> > > > >>>>>>> your position better. Regarding Jan's proposal to split > > partitions > > > >>>>>>> > > > >>>>>> with > > > >>>> > > > >>>>> backfill, do you think this should replace the proposal in the > > > >>>>>>> > > > >>>>>> existing > > > >>>> > > > >>>>> KIP, or do you think this is something that we should do in > > > >>>>>>> > > > >>>>>> addition > > > >>> > > > >>>> to > > > >>>> > > > >>>>> the > > > >>>>>> > > > >>>>>>> existing KIP? > > > >>>>>>> > > > >>>>>>> I think that interface is a good/necessary component of > KIP-253. > > > >>>>>> > > > >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, > > but I > > > >>>>>> > > > >>>>> do > > > >>> > > > >>>> think its utility will be limited unless there is a later KIP > > > >>>>>> > > > >>>>> offering > > > >>> > > > >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and > > > >>>>>> > > > >>>>> tackle > > > >>>> > > > >>>>> the ordering problem independently of backfill, so I'm in support > > of > > > >>>>>> > > > >>>>> the > > > >>>> > > > >>>>> current KIP. > > > >>>>>> > > > >>>>>> - Regarding point 6. I guess we can agree that it is better not > to > > > >>>>>> > > > >>>>> have > > > >>> > > > >>>> the > > > >>>>> > > > >>>>>> performance overhread of copying the input data. Before we > discuss > > > >>>>>>> > > > >>>>>> more > > > >>>> > > > >>>>> on > > > >>>>>> > > > >>>>>>> whether the performance overhead is acceptable or not, I am > > trying > > > >>>>>>> > > > >>>>>> to > > > >>> > > > >>>> figure out what is the benefit of introducing this overhread. You > > > >>>>>>> > > > >>>>>> mentioned > > > >>>>>> > > > >>>>>>> that the benefit is the loose organizational coupling. By > > > >>>>>>> > > > >>>>>> "organizational > > > >>>>> > > > >>>>>> coupling", are you referring to the requirement that consumer > > needs > > > >>>>>>> > > > >>>>>> to > > > >>>> > > > >>>>> know > > > >>>>>> > > > >>>>>>> the hash function of producer? If so, maybe we can discuss the > > > >>>>>>> > > > >>>>>> use-case > > > >>>> > > > >>>>> of > > > >>>>>> > > > >>>>>>> custom partiton function and see whether we can find a way to > > > >>>>>>> > > > >>>>>> support > > > >>> > > > >>>> such > > > >>>>>> > > > >>>>>>> use-case without having to copy the input data. > > > >>>>>>> > > > >>>>>>> I'm not too sure about what an "input" is in this sense, since > we > > > are > > > >>>>>> > > > >>>>> just > > > >>>>> > > > >>>>>> talking about topics. Actually the point I was making there is > > that > > > >>>>>> > > > >>>>> AKAICT > > > >>>>> > > > >>>>>> the performance overhead of a backfill is less than any other > > > option, > > > >>>>>> assuming you split partitions rarely. > > > >>>>>> > > > >>>>>> By "input" I was referring to source Kafka topic of a stream > > > >>>>> processing > > > >>>>> job. > > > >>>>> > > > >>>>> > > > >>>>> Separately, yes, "organizational coupling" increases if producers > > and > > > >>>>>> consumers have to share code, such as the partition function. > This > > > >>>>>> > > > >>>>> would > > > >>>> > > > >>>>> not be the case if producers could only pick from a menu of a few > > > >>>>>> well-known partition functions, but I think this is a poor > > tradeoff. > > > >>>>>> > > > >>>>>> Maybe we can revisit the custom partition function and see > whether > > > we > > > >>>>> actually need it? Otherwise, I am concerned that every user will > > pay > > > >>>>> > > > >>>> the > > > >>> > > > >>>> overhead of data movement to support something that was not really > > > >>>>> > > > >>>> needed > > > >>> > > > >>>> for most users. > > > >>>>> > > > >>>>> > > > >>>>> To me, this is two strong arguments in favor of backfill being > less > > > >>>>>> expensive than no backfill, but again, I think that particular > > > debate > > > >>>>>> > > > >>>>> comes > > > >>>>> > > > >>>>>> after KIP-253, so I don't want to create the impression of > > > opposition > > > >>>>>> > > > >>>>> to > > > >>>> > > > >>>>> your proposal. > > > >>>>>> > > > >>>>>> > > > >>>>>> Finally, to respond to a new email I just noticed: > > > >>>>>> > > > >>>>>> BTW, here is my understanding of the scope of this KIP. We want > to > > > >>>>>>> > > > >>>>>> allow > > > >>>>> > > > >>>>>> consumers to always consume messages with the same key from the > > > >>>>>>> > > > >>>>>> same > > > >>> > > > >>>> producer in the order they are produced. And we need to provide a > > > >>>>>>> > > > >>>>>> way > > > >>> > > > >>>> for > > > >>>>> > > > >>>>>> stream use-case to be able to flush/load state when messages > with > > > >>>>>>> > > > >>>>>> the > > > >>> > > > >>>> same > > > >>>>>> > > > >>>>>>> key are migrated between consumers. In addition to ensuring > that > > > >>>>>>> > > > >>>>>> this > > > >>> > > > >>>> goal > > > >>>>>> > > > >>>>>>> is correctly supported, we should do our best to keep the > > > >>>>>>> > > > >>>>>> performance > > > >>> > > > >>>> and > > > >>>>> > > > >>>>>> organization overhead of this KIP as low as possible. > > > >>>>>>> > > > >>>>>>> I think we're on the same page there! In fact, I would > > generalize a > > > >>>>>> > > > >>>>> little > > > >>>>> > > > >>>>>> more and say that the mechanism you've designed provides *all > > > >>>>>> > > > >>>>> consumers* > > > >>>> > > > >>>>> the ability "to flush/load state when messages with the same key > > are > > > >>>>>> migrated between consumers", not just Streams. > > > >>>>>> > > > >>>>>> Thanks for all the comment! > > > >>>>> > > > >>>>> > > > >>>>> Thanks for the discussion, > > > >>>>>> -John > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> > > > >>>>>> > > > >>>>> wrote: > > > >>> > > > >>>> Hey John, > > > >>>>>>> > > > >>>>>>> Thanks much for the detailed comments. Here are my thoughts: > > > >>>>>>> > > > >>>>>>> - The need to delete messages from log compacted topics is > mainly > > > >>>>>>> > > > >>>>>> for > > > >>> > > > >>>> performance (e.g. storage space) optimization than for correctness > > > >>>>>>> > > > >>>>>> for > > > >>>> > > > >>>>> this > > > >>>>>> > > > >>>>>>> KIP. I agree that we probably don't need to focus on this in > our > > > >>>>>>> > > > >>>>>> discussion > > > >>>>>> > > > >>>>>>> since it is mostly for performance optimization. > > > >>>>>>> > > > >>>>>>> - "Asking producers and consumers, or even two different > > producers, > > > >>>>>>> > > > >>>>>> to > > > >>>> > > > >>>>> share code like the partition function is a pretty huge ask. What > > > >>>>>>> > > > >>>>>> if > > > >>> > > > >>>> they > > > >>>>> > > > >>>>>> are using different languages?". It seems that today we already > > > >>>>>>> > > > >>>>>> require > > > >>>> > > > >>>>> different producer's to use the same hash function -- otherwise > > > >>>>>>> > > > >>>>>> messages > > > >>>>> > > > >>>>>> with the same key will go to different partitions of the same > > topic > > > >>>>>>> > > > >>>>>> which > > > >>>>> > > > >>>>>> may cause problem for downstream consumption. So not sure if it > > > >>>>>>> > > > >>>>>> adds > > > >>> > > > >>>> any > > > >>>>> > > > >>>>>> more constraint by assuming consumers know the hash function of > > > >>>>>>> > > > >>>>>> producer. > > > >>>>> > > > >>>>>> Could you explain more why user would want to use a cusmtom > > > >>>>>>> > > > >>>>>> partition > > > >>> > > > >>>> function? Maybe we can check if this is something that can be > > > >>>>>>> > > > >>>>>> supported > > > >>>> > > > >>>>> in > > > >>>>>> > > > >>>>>>> the default Kafka hash function. Also, can you explain more why > > it > > > >>>>>>> > > > >>>>>> is > > > >>> > > > >>>> difficuilt to implement the same hash function in different > > > >>>>>>> > > > >>>>>> languages? > > > >>>> > > > >>>>> - Besides the assumption that consumer needs to share the hash > > > >>>>>>> > > > >>>>>> function > > > >>>> > > > >>>>> of > > > >>>>>> > > > >>>>>>> producer, is there other organization overhead of the proposal > in > > > >>>>>>> > > > >>>>>> the > > > >>> > > > >>>> current KIP? > > > >>>>>>> > > > >>>>>>> - Currently producer can forget about the message that has been > > > >>>>>>> acknowledged by the broker. Thus the producer probably does not > > > >>>>>>> > > > >>>>>> know > > > >>> > > > >>>> most > > > >>>>> > > > >>>>>> of the exiting messages in topic, including those messages > > produced > > > >>>>>>> > > > >>>>>> by > > > >>>> > > > >>>>> other producers. We can have the owner of the producer to > > > >>>>>>> > > > >>>>>> split+backfill. > > > >>>>> > > > >>>>>> In my opion it will be a new program that wraps around the > > existing > > > >>>>>>> producer and consumer classes. > > > >>>>>>> > > > >>>>>>> - Regarding point 5. The argument is in favor of the > > split+backfill > > > >>>>>>> > > > >>>>>> but > > > >>>> > > > >>>>> for > > > >>>>>> > > > >>>>>>> changelog topic. And it intends to address the problem for > stream > > > >>>>>>> > > > >>>>>> use-case > > > >>>>>> > > > >>>>>>> in general. In this KIP we will provide interface (i.e. > > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > > > >>>>>>> > > > >>>>>> use-case > > > >>>> > > > >>>>> and > > > >>>>>> > > > >>>>>>> the goal is that user can flush/re-consume the state as part of > > the > > > >>>>>>> interface implementation regardless of whether there is change > > log > > > >>>>>>> > > > >>>>>> topic. > > > >>>>> > > > >>>>>> Maybe you are suggesting that the main reason to do > split+backfill > > > >>>>>>> > > > >>>>>> of > > > >>> > > > >>>> input > > > >>>>>> > > > >>>>>>> topic is to support log compacted topics? You mentioned in > Point > > 1 > > > >>>>>>> > > > >>>>>> that > > > >>>> > > > >>>>> log > > > >>>>>> > > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > > > >>>>>>> > > > >>>>>> understand > > > >>>>>> > > > >>>>>>> your position better. Regarding Jan's proposal to split > > partitions > > > >>>>>>> > > > >>>>>> with > > > >>>> > > > >>>>> backfill, do you think this should replace the proposal in the > > > >>>>>>> > > > >>>>>> existing > > > >>>> > > > >>>>> KIP, or do you think this is something that we should do in > > > >>>>>>> > > > >>>>>> addition > > > >>> > > > >>>> to > > > >>>> > > > >>>>> the > > > >>>>>> > > > >>>>>>> existing KIP? > > > >>>>>>> > > > >>>>>>> - Regarding point 6. I guess we can agree that it is better not > > to > > > >>>>>>> > > > >>>>>> have > > > >>>> > > > >>>>> the > > > >>>>>> > > > >>>>>>> performance overhread of copying the input data. Before we > > discuss > > > >>>>>>> > > > >>>>>> more > > > >>>> > > > >>>>> on > > > >>>>>> > > > >>>>>>> whether the performance overhead is acceptable or not, I am > > trying > > > >>>>>>> > > > >>>>>> to > > > >>> > > > >>>> figure out what is the benefit of introducing this overhread. You > > > >>>>>>> > > > >>>>>> mentioned > > > >>>>>> > > > >>>>>>> that the benefit is the loose organizational coupling. By > > > >>>>>>> > > > >>>>>> "organizational > > > >>>>> > > > >>>>>> coupling", are you referring to the requirement that consumer > > needs > > > >>>>>>> > > > >>>>>> to > > > >>>> > > > >>>>> know > > > >>>>>> > > > >>>>>>> the hash function of producer? If so, maybe we can discuss the > > > >>>>>>> > > > >>>>>> use-case > > > >>>> > > > >>>>> of > > > >>>>>> > > > >>>>>>> custom partiton function and see whether we can find a way to > > > >>>>>>> > > > >>>>>> support > > > >>> > > > >>>> such > > > >>>>>> > > > >>>>>>> use-case without having to copy the input data. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Dong > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler < > > j...@confluent.io> > > > >>>>>>> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hey Dong and Jun, > > > >>>>>>>> > > > >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll > mix > > > >>>>>>>> > > > >>>>>>> my > > > >>> > > > >>>> replies > > > >>>>>>> > > > >>>>>>>> together to try for a coherent response. I'm not too familiar > > > >>>>>>>> > > > >>>>>>> with > > > >>> > > > >>>> mailing-list etiquette, though. > > > >>>>>>>> > > > >>>>>>>> I'm going to keep numbering my points because it makes it easy > > > >>>>>>>> > > > >>>>>>> for > > > >>> > > > >>>> you > > > >>>>> > > > >>>>>> all > > > >>>>>>> > > > >>>>>>>> to respond. > > > >>>>>>>> > > > >>>>>>>> Point 1: > > > >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the > > > >>>>>>>> > > > >>>>>>> producers > > > >>>> > > > >>>>> and > > > >>>>>> > > > >>>>>>> consumers so that you preserve the correct ordering of records > > > >>>>>>>> > > > >>>>>>> during > > > >>>> > > > >>>>> partition expansion. This is clearly necessary regardless of > > > >>>>>>>> > > > >>>>>>> anything > > > >>>> > > > >>>>> else > > > >>>>>>> > > > >>>>>>>> we discuss. I think this whole discussion about backfill, > > > >>>>>>>> > > > >>>>>>> consumers, > > > >>>> > > > >>>>> streams, etc., is beyond the scope of KIP-253. But it would be > > > >>>>>>>> > > > >>>>>>> cumbersome > > > >>>>>> > > > >>>>>>> to start a new thread at this point. > > > >>>>>>>> > > > >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the > > > >>>>>>>> > > > >>>>>>> details... > > > >>> > > > >>>> I > > > >>>> > > > >>>>> think > > > >>>>>>> > > > >>>>>>>> this is a nice addition to the proposal. One thought is that > > it's > > > >>>>>>>> > > > >>>>>>> actually > > > >>>>>>> > > > >>>>>>>> irrelevant whether the hash function is linear. This is simply > > an > > > >>>>>>>> > > > >>>>>>> algorithm > > > >>>>>>> > > > >>>>>>>> for moving a key from one partition to another, so the type of > > > >>>>>>>> > > > >>>>>>> hash > > > >>> > > > >>>> function need not be a precondition. In fact, it also doesn't > > > >>>>>>>> > > > >>>>>>> matter > > > >>>> > > > >>>>> whether the topic is compacted or not, the algorithm works > > > >>>>>>>> > > > >>>>>>> regardless. > > > >>>>> > > > >>>>>> I think this is a good algorithm to keep in mind, as it might > > > >>>>>>>> > > > >>>>>>> solve a > > > >>>> > > > >>>>> variety of problems, but it does have a downside: that the > > > >>>>>>>> > > > >>>>>>> producer > > > >>> > > > >>>> won't > > > >>>>>> > > > >>>>>>> know whether or not K1 was actually in P1, it just knows that > K1 > > > >>>>>>>> > > > >>>>>>> was > > > >>>> > > > >>>>> in > > > >>>>> > > > >>>>>> P1's keyspace before the new epoch. Therefore, it will have to > > > >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the > next > > > >>>>>>>> > > > >>>>>>> time > > > >>>> > > > >>>>> K1 > > > >>>>> > > > >>>>>> comes along, the producer *also* won't remember that it already > > > >>>>>>>> > > > >>>>>>> retracted > > > >>>>>> > > > >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By > > > >>>>>>>> > > > >>>>>>> extension, > > > >>>> > > > >>>>> every > > > >>>>>>> > > > >>>>>>>> time the producer sends to P2, it will also have to send a > > > >>>>>>>> > > > >>>>>>> tombstone > > > >>>> > > > >>>>> to > > > >>>>> > > > >>>>>> P1, > > > >>>>>>> > > > >>>>>>>> which is a pretty big burden. To make the situation worse, if > > > >>>>>>>> > > > >>>>>>> there > > > >>> > > > >>>> is > > > >>>>> > > > >>>>>> a > > > >>>>>> > > > >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx > belonging > > > >>>>>>>> > > > >>>>>>> to > > > >>>> > > > >>>>> P3 > > > >>>>> > > > >>>>>> will also have to be retracted from P2 *and* P1, since the > > > >>>>>>>> > > > >>>>>>> producer > > > >>> > > > >>>> can't > > > >>>>>> > > > >>>>>>> know whether Kx had been last written to P2 or P1. Over a long > > > >>>>>>>> > > > >>>>>>> period > > > >>>> > > > >>>>> of > > > >>>>>> > > > >>>>>>> time, this clearly becomes a issue, as the producer must send > an > > > >>>>>>>> > > > >>>>>>> arbitrary > > > >>>>>>> > > > >>>>>>>> number of retractions along with every update. > > > >>>>>>>> > > > >>>>>>>> In contrast, the proposed backfill operation has an end, and > > > >>>>>>>> > > > >>>>>>> after > > > >>> > > > >>>> it > > > >>>> > > > >>>>> ends, > > > >>>>>>> > > > >>>>>>>> everyone can afford to forget that there ever was a different > > > >>>>>>>> > > > >>>>>>> partition > > > >>>>> > > > >>>>>> layout. > > > >>>>>>>> > > > >>>>>>>> Really, though, figuring out how to split compacted topics is > > > >>>>>>>> > > > >>>>>>> beyond > > > >>>> > > > >>>>> the > > > >>>>>> > > > >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in > > > >>>>>>>> > > > >>>>>>> this > > > >>>> > > > >>>>> KIP... > > > >>>>>>> > > > >>>>>>>> We do need in-order delivery during partition expansion. It > > would > > > >>>>>>>> > > > >>>>>>> be > > > >>>> > > > >>>>> fine > > > >>>>>> > > > >>>>>>> by me to say that you *cannot* expand partitions of a > > > >>>>>>>> > > > >>>>>>> log-compacted > > > >>> > > > >>>> topic > > > >>>>>> > > > >>>>>>> and call it a day. I think it would be better to tackle that in > > > >>>>>>>> > > > >>>>>>> another > > > >>>>> > > > >>>>>> KIP. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Point 2: > > > >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is > > > >>>>>>>> > > > >>>>>>> always > > > >>>> > > > >>>>> on > > > >>>>>> > > > >>>>>>> the table; any consumer who wants to re-shuffle its input is > free > > > >>>>>>>> > > > >>>>>>> to > > > >>>> > > > >>>>> do > > > >>>>> > > > >>>>>> so. > > > >>>>>>> > > > >>>>>>>> But this is currently not required. It's just that the current > > > >>>>>>>> > > > >>>>>>> high-level > > > >>>>>> > > > >>>>>>> story with Kafka encourages the use of partitions as a unit of > > > >>>>>>>> > > > >>>>>>> concurrency. > > > >>>>>>> > > > >>>>>>>> As long as consumers are single-threaded, they can happily > > > >>>>>>>> > > > >>>>>>> consume > > > >>> > > > >>>> a > > > >>>> > > > >>>>> single > > > >>>>>>> > > > >>>>>>>> partition without concurrency control of any kind. This is a > key > > > >>>>>>>> > > > >>>>>>> aspect > > > >>>>> > > > >>>>>> to > > > >>>>>>> > > > >>>>>>>> this system that lets folks design high-throughput systems on > > top > > > >>>>>>>> > > > >>>>>>> of > > > >>>> > > > >>>>> it > > > >>>>> > > > >>>>>> surprisingly easily. If all consumers were instead > > > >>>>>>>> > > > >>>>>>> encouraged/required > > > >>>>> > > > >>>>>> to > > > >>>>>> > > > >>>>>>> implement a repartition of their own, then the consumer becomes > > > >>>>>>>> significantly more complex, requiring either the consumer to > > > >>>>>>>> > > > >>>>>>> first > > > >>> > > > >>>> produce > > > >>>>>>> > > > >>>>>>>> to its own intermediate repartition topic or to ensure that > > > >>>>>>>> > > > >>>>>>> consumer > > > >>>> > > > >>>>> threads have a reliable, high-bandwith channel of communication > > > >>>>>>>> > > > >>>>>>> with > > > >>>> > > > >>>>> every > > > >>>>>>> > > > >>>>>>>> other consumer thread. > > > >>>>>>>> > > > >>>>>>>> Either of those tradeoffs may be reasonable for a particular > > user > > > >>>>>>>> > > > >>>>>>> of > > > >>>> > > > >>>>> Kafka, > > > >>>>>>> > > > >>>>>>>> but I don't know if we're in a position to say that they are > > > >>>>>>>> > > > >>>>>>> reasonable > > > >>>>> > > > >>>>>> for > > > >>>>>>> > > > >>>>>>>> *every* user of Kafka. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Point 3: > > > >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and > > > >>>>>>>> > > > >>>>>>> maintaining > > > >>>>>> > > > >>>>>>> the > > > >>>>>>>> states in a local store", I agree that they may use a > framework > > > >>>>>>>> > > > >>>>>>> *like* > > > >>>>> > > > >>>>>> Kafka Streams, but that is not the same as using Kafka Streams. > > > >>>>>>>> > > > >>>>>>> This > > > >>>> > > > >>>>> is > > > >>>>> > > > >>>>>> why > > > >>>>>>> > > > >>>>>>>> I think it's better to solve it in Core: because it is then > > > >>>>>>>> > > > >>>>>>> solved > > > >>> > > > >>>> for > > > >>>>> > > > >>>>>> KStreams and also for everything else that facilitates local > > > >>>>>>>> > > > >>>>>>> state > > > >>> > > > >>>> maintenance. To me, Streams is a member of the category of > > > >>>>>>>> > > > >>>>>>> "stream > > > >>> > > > >>>> processing frameworks", which is itself a subcategory of "things > > > >>>>>>>> > > > >>>>>>> requiring > > > >>>>>>> > > > >>>>>>>> local state maintenence". I'm not sure if it makes sense to > > > >>>>>>> > > > >>>>>>> > > > > > > > > > -- > -- Guozhang >