Matthias, Yes, I agree. KIP is updated: https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint> and follow-up JIRA ticket is linked in “Rejected Alternatives” section.
Thank you all for an interesting discussion. Kind Regards, Levani > On Nov 16, 2019, at 10:11 AM, Matthias J. Sax <matth...@confluent.io> wrote: > > Levani, > > do you agree to the current proposal? It's basically a de-scoping of the > already voted KIP. If you agree, could you update the KIP wiki page > accordingly, including the "Rejected Alternative" section (and mabye a > link to a follow up Jira ticket). > > Because it's a descope, and John and myself support it, there seems to > be no need to re-vote. > > @Sophie,John: thanks a lot for your thoughtful input! > > > -Matthias > > On 11/15/19 12:47 PM, John Roesler wrote: >> Thanks Sophie, >> >> I think your concern is valid, and also that your idea to make a >> ticket is a good idea. >> >> Creating a ticket has some very positive effects: >> * It allows us to record the thinking at this point in time so we >> don't have to dig through the mail archives later >> * It demonstrates that we did consider the use case, and do want to >> address it, but just don't feel confident to implement it right now. >> Then, if/when people do have a problem with the gap, the ticket it >> already there for them to consider, request, or even pick up. >> >> Since one aspect of the deferral is a desire to wait for real use >> experience, we should explicitly mention that in the ticket. This is >> just good information for people browsing the Jira looking for >> interesting tickets to pick up. They could still pick it up, but they >> can ask themselves if they really understand the real-world use cases >> any better than we do right now. >> >> Thanks, likewise, to you for the good discussion! >> -John >> >> On Fri, Nov 15, 2019 at 2:37 PM Sophie Blee-Goldman <sop...@confluent.io> >> wrote: >>> >>> While I'm concerned that "not augmenting groupBy as part of this KIP" >>> really translates to "will not get around to augmenting groupBy for a long >>> time if not as part of this KIP", like I said I don't want to hold up the >>> new >>> .repartition operator that it seems we do, at least, all agree on. It's a >>> fair >>> point that we can always add this in later, but undoing it is far more >>> problematic. >>> >>> Anyways, I would be happy if we at least make a ticket to consider adding a >>> "number of partitions" option/suggestion to groupBy, so that we don't lose >>> all the thought put in to this decision so far and can avoid rehashing the >>> same >>> argument word for word and have something to point to when someone >>> asks "why didn't we add this numPartitions option to groupBy". >>> >>> Beyond that, if the community isn't pushing for it at this moment then it >>> seems very >>> reasonable to shelve the idea for now so that the rest of this KIP can >>> proceed. >>> Without input one way or another it's hard to say what the right thing to >>> do is, >>> which makes the right thing to do "wait to add this feature" >>> >>> Thanks for the good discussion everyone, >>> >>> Sophie >>> >>> On Fri, Nov 15, 2019 at 12:41 PM John Roesler <j...@confluent.io> wrote: >>> >>>> Hi all, >>>> >>>> I think that Sophie is asking a good question, and I do think that >>>> such "blanket configurations" are plausible. For example, we currently >>>> support (and I would encourage) "I don't know if this is going to >>>> create a repartition topic, but if it does, then use this name instead >>>> of generating one". >>>> >>>> I'm not sure I'm convinced that specifying max parallelism falls into >>>> this category. After all, the groupByKey+aggregate will be executed >>>> with _some_ max parallelism. It's either the same as the inputs' >>>> partition count or overridden with the proposed config. It seems >>>> counterintuitive to override the specified option with the default >>>> value. >>>> >>>> I'm not sure if I can put my finger on it, but "maybe use this name" >>>> seems way more reasonable to me than "maybe execute with this degree >>>> of parallelism". >>>> >>>> I do think (and I appreciate that this is where Sophie's example is >>>> coming from) that Streams should strive to be absolutely as simple and >>>> intuitive as possible (while still maintaining correctness). Optimal >>>> performance can be at odds with API simplicity. For example, the >>>> simplest behavior is, if you ask for 5 partitions, you get 5 >>>> partitions. Maybe a repartition is technically not necessary (if you >>>> didn't change the key), but at least there's no mystery to this >>>> behavior. >>>> >>>> Clearly, an (opposing) tenent of simplicity is trying to prevent >>>> people from making mistakes, which I think is what the example boils >>>> down to. Sometimes, we can prevent clear mistakes, like equi-joining >>>> two topics with different partition counts. But for this case, it >>>> doesn't seem as clear-cut to be able to assume that they _said_ 5 >>>> partitions, but they didn't really _want_ 5 partitions. Maybe we can >>>> just try to be clear in the documentation, and also even log a warning >>>> when we parse the topology, "hey, I've been asked to repartition this >>>> stream, but it's not necessary". >>>> >>>> If anything, this discussion really supports to me the value in just >>>> sticking with `repartition()` for now, and deferring >>>> `groupBy[Key](partitions)` to the future. >>>> >>>>> Users should not have to choose between allowing Streams to optimize the >>>> repartition placement, and allowing to specify a number of partitions. >>>> >>>> This is a very fair point, and it may be something that we rapidly >>>> return to, but it seems safe for now to introduce the non-optimizable >>>> `reparition()` only, and then consider optimization options later. >>>> Skipping available optimizations will never break correctness, but >>>> adding optimizations can, so it makes sense to treat them with >>>> caution. >>>> >>>> In conclusion, I do think that a use _could_ want to "maybe specify" >>>> the partition count, but I also think we can afford to pass on >>>> supporting this right now. >>>> >>>> I'm open to continuing the discussion, but just to avoid ambiguity, I >>>> still feel we should _not_ change the groupBy[Key] operation at all, >>>> and we should only add `repartition()` as a non-optimizable operation. >>>> >>>> Thanks all, >>>> -John >>>> >>>> On Fri, Nov 15, 2019 at 11:26 AM Levani Kokhreidze >>>> <levani.co...@gmail.com> wrote: >>>>> >>>>> Hello, >>>>> >>>>> Just fyi, PR was updated and now it incorporates the latest suggestions >>>> about joins. >>>>> `CopartitionedTopicsEnforcer` will throw an exception if number of >>>> partitions aren’t the same when using `repartition` operation along with >>>> `join`. >>>>> >>>>> For more details please take a look at the PR: >>>> https://github.com/apache/kafka/pull/7170/files < >>>> https://github.com/apache/kafka/pull/7170/files> >>>>> >>>>> Regards, >>>>> Levani >>>>> >>>>> >>>>>> On Nov 15, 2019, at 11:01 AM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>>>> >>>>>> Thanks a lot for the input Sophie. >>>>>> >>>>>> Your example is quite useful, and I would use it to support my claim >>>>>> that a "partition hint" for `Grouped` seems "useless" and does not >>>>>> improve the user experience. >>>>>> >>>>>> 1) You argue that a new user would be worries about repartitions topics >>>>>> with too many paritions. This would imply that a user is already >>>>>> advanced enough to understand the implication of repartitioning -- for >>>>>> this case, I would argue that a user also understand _when_ a >>>>>> auto-repartitioning would happen and thus the users understands where >>>> to >>>>>> insert a `repartition()` operation. >>>>>> >>>>>> 2) For specifying Serdes: if a `groupByKey()` does not trigger >>>>>> auto-repartitioning it's not required to specify the serdes and if they >>>>>> are specified they would be ignored/unused (note, that `groupBy()` >>>> would >>>>>> always trigger a repartitioning). Of course, if the default Serdes from >>>>>> the config match (eg, all data types are Json anyway), a user does not >>>>>> need to worry about specifying serdes. -- For new user that play >>>> around, >>>>>> I would assume that they work a lot with primitive types and thus would >>>>>> need to specify the serdes -- hence, they would learn about >>>>>> auto-repartitioning the hard way anyhow, because each time a >>>>>> `groupByKey()` does trigger auto-repartioning, they would need to pass >>>>>> in the correct Serdes -- this way, they would also be educated where to >>>>>> insert a `repartition()` operator if needed. >>>>>> >>>>>> 3) If a new user really just "plays around", I don't think they use an >>>>>> input topic with 100 partitions but most likely have a local single >>>> node >>>>>> broker with most likely single partitions topics. >>>>>> >>>>>> >>>>>> My main argument for my current proposal is however, that---based on >>>>>> past experience---it's better to roll out a new feature more carefully >>>>>> and see how it goes. Last, as John pointed out, we can still extend the >>>>>> feature in the future. Instead of making a judgment call up-front, >>>> being >>>>>> more conservative and less fancy, and revisit the design based on >>>>>> actuall user feedback after the first version is rolled out, seems to >>>> be >>>>>> the better option. Undoing a feature is must harder than extending it. >>>>>> >>>>>> >>>>>> While I advocate strong for a simple first version of this feature, >>>> it's >>>>>> a community decission in the end, and I would not block this KIP if >>>>>> there is a broad preference to add `Grouped#withNumberOfPartitions()` >>>>>> either. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote: >>>>>>> It seems like we all agree at this point (please correct me if >>>> wrong!) that >>>>>>> we should NOT change >>>>>>> the existing repartitioning behavior, ie we should allow Streams to >>>>>>> continue to determine when and >>>>>>> where to repartition -- *unless* explicitly informed to by the use of >>>> a >>>>>>> .through or the new .repartition operator. >>>>>>> >>>>>>> Regarding groupBy, the existing behavior we should not disrupt is >>>>>>> a) repartition *only* when required due to upstream key-changing >>>> operation >>>>>>> (ie don't force repartitioning >>>>>>> based on the presence of an optional config parameter), and >>>>>>> b) allow optimization of required repartitions, if any >>>>>>> >>>>>>> Within the constraint of not breaking the existing behavior, this >>>> still >>>>>>> leaves open the question of whether we >>>>>>> want to improve the user experience by allowing to provide groupBy >>>> with a >>>>>>> *suggestion* for numPartitions (or to >>>>>>> put it more fairly, whether that *will* improve the experience). I >>>> agree >>>>>>> with many of the arguments outlined above but >>>>>>> let me just push back on this one issue one final time, and if we >>>> can't >>>>>>> come to a consensus then I am happy to drop >>>>>>> it for now so that the KIP can proceed. >>>>>>> >>>>>>> Specifically, my proposal would be to simply augment Grouped with an >>>>>>> optional numPartitions, understood to >>>>>>> indicate the user's desired number of partitions *if Streams decides >>>> to >>>>>>> repartition due to that groupBy* >>>>>>> >>>>>>>> if a user cares about the number of partition, the user wants to >>>> enforce >>>>>>> a repartitioning >>>>>>> First, I think we should take a step back and examine this claim. I >>>> agree >>>>>>> 100% that *if this is true,* >>>>>>> *then we should not give groupBy an optional numPartitions.* As far >>>> as I >>>>>>> see it, there's no argument >>>>>>> to be had there if we *presuppose that claim.* But I'm not convinced >>>> in >>>>>>> that as an axiom of the user >>>>>>> experience and think we should be examining that claim itself, not the >>>>>>> consequences of it. >>>>>>> >>>>>>> To give a simple example, let's say some new user is trying out >>>> Streams and >>>>>>> wants to just play around >>>>>>> with it to see if it might be worth looking into. They want to just >>>> write >>>>>>> up a simple app and test it out on the >>>>>>> data in some existing topics they have with a large number of >>>> partitions, >>>>>>> and a lot of data. They're just messing >>>>>>> around, trying new topologies and don't want to go through each new >>>> one >>>>>>> step by step to determine if (or where) >>>>>>> a repartition might be required. They also don't want to force a >>>>>>> repartition if it turns out to not be required, so they'd >>>>>>> like to avoid the nice new .repartition operator they saw. But given >>>> the >>>>>>> huge number of input partitions, they'd like >>>>>>> to rest assured that if a repartition does end up being required >>>> somewhere >>>>>>> during dev, it will not be created with >>>>>>> the same huge number of partitions that their input topic has -- so >>>> they >>>>>>> just pass groupBy a small numPartitions >>>>>>> suggestion. >>>>>>> >>>>>>> I know that's a bit of a contrived example but I think it does >>>> highlight >>>>>>> how and when this might be a considerable >>>>>>> quality of life improvement, in particular for new users to Streams >>>> and/or >>>>>>> during the dev cycle. *You don't want to* >>>>>>> *force a repartition if it wasn't necessary, but you don't want to >>>> create a >>>>>>> topic with a huge partition count either.* >>>>>>> >>>>>>> Also, while the optimization discussion took us down an interesting >>>> but >>>>>>> ultimately more distracting road, it's worth >>>>>>> pointing out that it is clearly a major win to have as few >>>>>>> repartition topics/steps as possible. Given that we >>>>>>> don't want to change existing behavior, the optimization framework >>>> can only >>>>>>> help out when the placement of >>>>>>> repartition steps is flexible, which means only those from .groupBy >>>> (and >>>>>>> not .repartition). *Users should not* >>>>>>> *have to choose between allowing Streams to optimize the repartition >>>>>>> placement, and allowing to specify a * >>>>>>> *number of partitions.* >>>>>>> >>>>>>> Lastly, I have what may be a stupid question but for my own >>>> edification of >>>>>>> how groupBy works: >>>>>>> if you do a .groupBy and a repartition is NOT required, does it ever >>>> need >>>>>>> to serialize/deserialize >>>>>>> any of the data? In other words, if you pass a key/value serde to >>>> groupBy >>>>>>> and it doesn't trigger >>>>>>> a repartition, is the serde(s) just ignored and thus more like a >>>> suggestion >>>>>>> than a requirement? >>>>>>> >>>>>>> So again, I don't want to hold up this KIP forever but I feel we've >>>> spent >>>>>>> some time getting slightly >>>>>>> off track (although certainly into very interesting discussions) yet >>>> never >>>>>>> really addressed or questioned >>>>>>> the basic premise: *could a user want to specify a number of >>>> partitions but >>>>>>> not enforce a repartition (at that* >>>>>>> *specific point in the topology)?* >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Side remark: >>>>>>>> >>>>>>>> If the user specifies `repartition()` on both side of the join, we >>>> can >>>>>>>> actually throw the execption earlier, ie, when we build the topology. >>>>>>>> >>>>>>>> Current, we can do this check only after Kafka Streams was started, >>>>>>>> within `StreamPartitionAssignor#assign()` -- we still need to keep >>>> this >>>>>>>> check for the case that none or only one side has a user specified >>>>>>>> number of partitions though. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 11/14/19 8:15 AM, John Roesler wrote: >>>>>>>>> Thanks, all, >>>>>>>>> >>>>>>>>> I can get behind just totally leaving out reparation-via-groupBy. If >>>>>>>>> we only introduce `repartition()` for now, we're making the minimal >>>>>>>>> change to gain the desired capability. >>>>>>>>> >>>>>>>>> Plus, since we agree that `repartition()` should never be >>>> optimizable, >>>>>>>>> it's a future-compatible proposal. I.e., if we were to add a >>>>>>>>> non-optimizable groupBy(partitions) operation now, and want to make >>>> it >>>>>>>>> optimizable in the future, we have to worry about topology >>>>>>>>> compatibility. Better to just do non-optimizable `repartition()` >>>> now, >>>>>>>>> and add an optimizable `groupBy(partitions)` in the future (maybe). >>>>>>>>> >>>>>>>>> About joins, yes, it's a concern, and IMO we should just do the same >>>>>>>>> thing we do now... check at runtime that the partition counts on >>>> both >>>>>>>>> sides match and throw an exception otherwise. What this means as a >>>>>>>>> user is that if you explicitly repartition the left side to 100 >>>>>>>>> partitions, and then join with the right side at 10 partitions, you >>>>>>>>> get an exception, since this operation is not possible. You'd either >>>>>>>>> have to "step down" the left side again, back to 10 partitions, or >>>> you >>>>>>>>> could repartition the right side to 100 partitions before the join. >>>>>>>>> The choice has to be the user's, since it depends on their desired >>>>>>>>> execution parallelism. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Thanks a lot John. I think the way you decompose the operators is >>>> super >>>>>>>>>> helpful for this discussion. >>>>>>>>>> >>>>>>>>>> What you suggest with regard to using `Grouped` and enforcing >>>>>>>>>> repartitioning if the number of partitions is specified is >>>> certainly >>>>>>>>>> possible. However, I am not sure if we _should_ do this. My >>>> reasoning is >>>>>>>>>> that an enforce repartitioning as introduced via `repartition()` >>>> is an >>>>>>>>>> expensive operations, and it seems better to demand an more >>>> explicit >>>>>>>>>> user opt-in to trigger it. Just setting an optional parameter >>>> might be >>>>>>>>>> too subtle to trigger such a heavy "side effect". >>>>>>>>>> >>>>>>>>>> While I agree about "usability" in general, I would prefer a more >>>>>>>>>> conservative appraoch to introduce this feature, see how it goes, >>>> and >>>>>>>>>> maybe make it more advance later on. This also applies to what >>>>>>>>>> optimzation we may or may not allow (or are able to perform at >>>> all). >>>>>>>>>> >>>>>>>>>> @Levani: Reflecting about my suggestion about `Repartioned extends >>>>>>>>>> Grouped`, I agree that it might not be a good idea. >>>>>>>>>> >>>>>>>>>> Atm, I see an enforces repartitioning as non-optimizable and as a >>>> good >>>>>>>>>> first step and I would suggest to not intoruce anything else for >>>> now. >>>>>>>>>> Introducing optimizable enforce repartitioning via `groupBy(..., >>>>>>>>>> Grouped)` is something we could add later. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Therefore, I would not change `Grouped` but only introduce >>>>>>>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in >>>> to >>>>>>>>>> set the number of partitions, would need to rewrite their code to >>>>>>>>>> `selectKey(...).repartition(...).groupByKey()`. It's less >>>> convinient but >>>>>>>>>> also less risky from an API and optimization point of view. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Levani: about joins -> yes, we will need to check the specified >>>> number >>>>>>>>>> of partitions (if any) and if they don't match, throw an >>>> exception. We >>>>>>>>>> can discuss this on the PR -- I am just trying to get the PR for >>>> KIP-466 >>>>>>>>>> merged -- your is next on the list :) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote: >>>>>>>>>>> Thank you all for an interesting discussion. This is very >>>> enlightening. >>>>>>>>>>> >>>>>>>>>>> Thank you Matthias for your explanation. Your arguments are very >>>> true. >>>>>>>> It makes sense that if user specifies number of partitions he/she >>>> really >>>>>>>> cares that those specifications are applied to internal topics. >>>>>>>>>>> Unfortunately, in current implementation this is not true during >>>>>>>> `join` operation. As I’ve written in the PR comment, currently, when >>>>>>>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max >>>> number of >>>>>>>> partitions from the two source topics. >>>>>>>>>>> I’m not really sure what would be the other way around this >>>> situation. >>>>>>>> Maybe fail the stream altogether and inform the user to specify same >>>> number >>>>>>>> of partitions? >>>>>>>>>>> Or we should treat join operations in a same way as it is right >>>> now >>>>>>>> and basically choose max number of partitions even when `repartition` >>>>>>>> operation is specified, because Kafka Streams “knows the best” how to >>>>>>>> handle joins? >>>>>>>>>>> You can check integration tests how it’s being handled currently. >>>> Open >>>>>>>> to suggestions on that part. >>>>>>>>>>> >>>>>>>>>>> As for groupBy, I agree and John raised very interesting points. >>>> My >>>>>>>> arguments for allowing users to specify number of partitions during >>>> groupBy >>>>>>>> operations mainly was coming from the usability perspective. >>>>>>>>>>> So building on top of what John said, maybe it makes sense to make >>>>>>>> `groupBy` operations smarter and whenever user specifies >>>>>>>> `numberOfPartitions` configuration, repartitioning will be enforced, >>>> wdyt? >>>>>>>>>>> I’m not going into optimization part yet :) I think it will be >>>> part of >>>>>>>> separate PR and task, but overall it makes sense to apply >>>> optimizations >>>>>>>> where number of partitions are the same. >>>>>>>>>>> >>>>>>>>>>> As for Repartitioned extending Grouped, I kinda feel that it >>>> won’t fit >>>>>>>> nicely in current API design. >>>>>>>>>>> In addition, in the PR review, John mentioned that there were a >>>> lot of >>>>>>>> troubles in the past trying to use one operation's configuration >>>> objects on >>>>>>>> other operations. >>>>>>>>>>> Also it makes sense to keep them separate in terms of >>>> compatibility. >>>>>>>>>>> In that case, we don’t have to worry every time Grouped is >>>> changed, >>>>>>>> what would be the implications on `repartition` operations. >>>>>>>>>>> >>>>>>>>>>> Kind regards, >>>>>>>>>>> Levani >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io> >>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Ah, thanks for the clarification. I missed your point. >>>>>>>>>>>> >>>>>>>>>>>> I like the framework you've presented. It does seem simpler to >>>> assume >>>>>>>>>>>> that they either care about the partition count and want to >>>>>>>>>>>> repartition to realize it, or they don't care about the number. >>>>>>>>>>>> Returning to this discussion, it does seem unlikely that they >>>> care >>>>>>>>>>>> about the number and _don't_ care if it actually gets realized. >>>>>>>>>>>> >>>>>>>>>>>> But then, it still seems like we can just keep the option as >>>> part of >>>>>>>>>>>> Grouped. As in: >>>>>>>>>>>> >>>>>>>>>>>> // user does not care >>>>>>>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/) >>>>>>>>>>>> stream.groupBy(Grouped /*not specifying partition count*/) >>>>>>>>>>>> >>>>>>>>>>>> // user does care >>>>>>>>>>>> stream.repartition(Repartitioned) >>>>>>>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...)) >>>>>>>>>>>> stream.groupBy(Grouped.numberOfPartitions(...)) >>>>>>>>>>>> >>>>>>>>>>>> ---- >>>>>>>>>>>> >>>>>>>>>>>> The above discussion got me thinking about algebra. Matthias is >>>>>>>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent >>>> to >>>>>>>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not >>>> convinced that >>>>>>>>>>>> we should force people to apply that expansion themselves vs. >>>> having a >>>>>>>>>>>> more compact way to express it if they don't care where exactly >>>> the >>>>>>>>>>>> repartition occurs. However, thinking about these operators >>>>>>>>>>>> algebraically can really help *us* narrow down the number of >>>> different >>>>>>>>>>>> expressions we have to consider. >>>>>>>>>>>> >>>>>>>>>>>> Let's consider some identities: >>>>>>>>>>>> >>>>>>>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg >>>>>>>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg >>>>>>>>>>>> C: mapKey(mapper) + ... + groupByKey + agg >>>>>>>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg >>>>>>>>>>>> D: repartition = sink(managed) + src >>>>>>>>>>>> >>>>>>>>>>>> In these identities, I used one special identifier (...), which >>>> means >>>>>>>>>>>> any number (0+) of operations that are not src, mapKey, >>>> groupBy[Key], >>>>>>>>>>>> repartition, or agg. >>>>>>>>>>>> >>>>>>>>>>>> For mental clarity, I'm just going to make up a rule that groupBy >>>>>>>>>>>> operations are not executable. In other words, you have to get >>>> to a >>>>>>>>>>>> point where you can apply B to convert a groupByKey into a >>>> passthough >>>>>>>>>>>> in order to execute the program. This is just a formal way of >>>> stating >>>>>>>>>>>> what already happens in Kafka Streams. >>>>>>>>>>>> >>>>>>>>>>>> By applying A, we can just completely leave `groupBy` out of our >>>>>>>>>>>> analysis. It trivially decomposes into a mapKey followed by a >>>>>>>>>>>> groupByKey. >>>>>>>>>>>> >>>>>>>>>>>> Then, we can eliminate the "repartition required" case of >>>> `groupByKey` >>>>>>>>>>>> by applying C followed by D to get to the "no repartition >>>> required" >>>>>>>>>>>> version of groupByKey, which in turn sets us up to apply B to >>>> get an >>>>>>>>>>>> executable topology. >>>>>>>>>>>> >>>>>>>>>>>> Fundamentally, you can think about KIP-221 is as proposing a >>>> modified >>>>>>>>>>>> D identity in which you can specify the partition count of the >>>> managed >>>>>>>>>>>> sink topic: >>>>>>>>>>>> D': repartition(pc) = sink(managed w/ pc) + src >>>>>>>>>>>> >>>>>>>>>>>> Since users _could_ apply the identities above, we don't >>>> actually have >>>>>>>>>>>> to add any partition count to groupBy[Key], but we decided early >>>> on in >>>>>>>>>>>> the KIP discussion that it's more ergonomic to add it. In that >>>> case, >>>>>>>>>>>> we also have to modify A and C: >>>>>>>>>>>> A': groupBy(mapper, pc) + agg >>>>>>>>>>>> = mapKey(mapper) + groupByKey(pc) + agg >>>>>>>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg >>>>>>>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg >>>>>>>>>>>> >>>>>>>>>>>> Which sets us up still to always be able to get back to a plain >>>>>>>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and >>>>>>>>>>>> ultimately B to get an executable topology. >>>>>>>>>>>> >>>>>>>>>>>> What about the optimizer? >>>>>>>>>>>> The optimizer applies another set of graph-algebraic identities >>>> to >>>>>>>>>>>> minimize the number of repartition topics in a topology. >>>>>>>>>>>> >>>>>>>>>>>> (forgive my ascii art) >>>>>>>>>>>> >>>>>>>>>>>> E: (merging repartition nodes) >>>>>>>>>>>> (...) -> repartition -> X >>>>>>>>>>>> \-> repartition -> Y >>>>>>>>>>>> = >>>>>>>>>>>> (... + repartition) -> X >>>>>>>>>>>> \-> Y >>>>>>>>>>>> F: (reordering around repartition) >>>>>>>>>>>> Where SVO is any non-key-changing, stateless, operation: >>>>>>>>>>>> repartition -> SVO = SVO -> repartition >>>>>>>>>>>> >>>>>>>>>>>> In terms of these identities, what the optimizer does is apply F >>>>>>>>>>>> repeatedly in either direction to a topology to factor out >>>> common in >>>>>>>>>>>> branches so that it can apply E to merge repartition nodes. This >>>> was >>>>>>>>>>>> especially necessary before KIP-221 because you couldn't directly >>>>>>>>>>>> express `repartition` in the DSL, only indirectly via >>>> `groupBy[Key]`, >>>>>>>>>>>> so there was no way to do the factoring manually. >>>>>>>>>>>> >>>>>>>>>>>> We can now state very clearly that in KIP-221, explicit >>>>>>>>>>>> `repartition()` operators should create a "reordering barrier". >>>> So, F >>>>>>>>>>>> cannot be applied to an explicit `repartition()`. Also, I think >>>> we >>>>>>>>>>>> decided earlier that explicit `repartition()` operations would >>>> also be >>>>>>>>>>>> ineligible for merging, so E can't be applied to explicit >>>>>>>>>>>> `repartition()` operations either. I think we feel we _could_ >>>> apply E >>>>>>>>>>>> without harm, but we want to be conservative for now. >>>>>>>>>>>> >>>>>>>>>>>> I think the salient point from the latter discussion has been >>>> that >>>>>>>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_ >>>> constitute >>>>>>>>>>>> an explicit `repartition()` operator, and therefore, the >>>> resulting >>>>>>>>>>>> repartition node remains eligible for optimization. >>>>>>>>>>>> >>>>>>>>>>>> To be clear, I agree with Matthias that the provided partition >>>> count >>>>>>>>>>>> _must_ be used in the resulting implicit repartition. This has >>>> some >>>>>>>>>>>> implications for E. Namely, E could only be applied to two >>>> repartition >>>>>>>>>>>> nodes that have the same partition count. This has always been >>>>>>>>>>>> trivially true before KIP-221 because the partition count has >>>> always >>>>>>>>>>>> been "unspecified", i.e., it would be determined at runtime by >>>> the >>>>>>>>>>>> user-managed-topics' partition counts. Now, it could be >>>> specified or >>>>>>>>>>>> unspecified. We can simply augment E to allow merging only >>>> repartition >>>>>>>>>>>> nodes where the partition count is EITHER "specified and the >>>> same on >>>>>>>>>>>> both sides", OR "unspecified on both sides". >>>>>>>>>>>> >>>>>>>>>>>> Sorry for the long email, but I have a hope that it builds a >>>> solid >>>>>>>>>>>> theoretical foundation for our decisions in KIP-221, so we can >>>> have >>>>>>>>>>>> confidence that there are no edge cases for design flaws to hide. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> -John >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> it seems like we do want to allow >>>>>>>>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>>>>>>>> operation, but we don't want that option to _force_ >>>> repartitioning >>>>>>>>>>>>> >>>>>>>>>>>>> Correct, ie, that is my suggestions. >>>>>>>>>>>>> >>>>>>>>>>>>>> "Use P partitions if repartitioning is necessary" >>>>>>>>>>>>> >>>>>>>>>>>>> I disagree here, because my reasoning is that: >>>>>>>>>>>>> >>>>>>>>>>>>> - if a user cares about the number of partition, the user wants >>>> to >>>>>>>>>>>>> enforce a repartitioning >>>>>>>>>>>>> - if a user does not case about the number of partitions, we >>>> don't >>>>>>>> need >>>>>>>>>>>>> to provide them a way to pass in a "hint" >>>>>>>>>>>>> >>>>>>>>>>>>> Hence, it should be sufficient to support: >>>>>>>>>>>>> >>>>>>>>>>>>> // user does not care >>>>>>>>>>>>> >>>>>>>>>>>>> `stream.groupByKey(Grouped)` >>>>>>>>>>>>> `stream.grouBy(..., Grouped)` >>>>>>>>>>>>> >>>>>>>>>>>>> // user does care >>>>>>>>>>>>> >>>>>>>>>>>>> `stream.repartition(Repartitioned).groupByKey()` >>>>>>>>>>>>> `streams.groupBy(..., Repartitioned)` >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote: >>>>>>>>>>>>>> Thanks for those thoughts, Matthias, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I find your reasoning about the optimization behavior >>>> compelling. >>>>>>>> The >>>>>>>>>>>>>> `through` operation is very simple and clear to reason about. >>>> It >>>>>>>> just >>>>>>>>>>>>>> passes the data exactly at the specified point in the topology >>>>>>>> exactly >>>>>>>>>>>>>> through the specified topic. Likewise, if a user invokes a >>>>>>>>>>>>>> `repartition` operator, the simplest behavior is if we just do >>>> what >>>>>>>>>>>>>> they asked for. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Stepping back to think about when optimizations are surprising >>>> and >>>>>>>>>>>>>> when they aren't, it occurs to me that we should be free to >>>> move >>>>>>>>>>>>>> around repartitions when users have asked to perform some >>>> operation >>>>>>>>>>>>>> that implies a repartition, like "change keys, then filter, >>>> then >>>>>>>>>>>>>> aggregate". This program requires a repartition, but it could >>>> be >>>>>>>>>>>>>> anywhere between the key change and the aggregation. On the >>>> other >>>>>>>>>>>>>> hand, if they say, "change keys, then filter, then >>>> repartition, then >>>>>>>>>>>>>> aggregate", it seems like they were pretty clear about their >>>> desire, >>>>>>>>>>>>>> and we should just take it at face value. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, I'm sold on just literally doing a repartition every time >>>> they >>>>>>>>>>>>>> invoke the `repartition` operator. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is >>>> more >>>>>>>> nuanced. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What you said about `groupByKey` makes sense. If they key >>>> hasn't >>>>>>>>>>>>>> actually changed, then we don't need to repartition before >>>>>>>>>>>>>> aggregating. On the other hand, `groupBy` is specifically >>>> changing >>>>>>>> the >>>>>>>>>>>>>> key as part of the grouping operation, so (as you said) we >>>>>>>> definitely >>>>>>>>>>>>>> have to do a repartition. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I'm reading the discussion right, it seems like we do want >>>> to >>>>>>>> allow >>>>>>>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>>>>>>> operation, but we don't want that option to _force_ >>>> repartitioning >>>>>>>> if >>>>>>>>>>>>>> it's not needed. That last clause is the key. "Use P >>>> partitions if >>>>>>>>>>>>>> repartitioning is necessary" is a directive that applies >>>> cleanly and >>>>>>>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call >>>> the >>>>>>>>>>>>>> option `numberOfPartitionsHint`, which along with the "if >>>> necessary" >>>>>>>>>>>>>> javadoc, should make it clear that the option won't force a >>>>>>>>>>>>>> repartition, and also gives us enough latitude to still employ >>>> the >>>>>>>>>>>>>> optimizer on those repartition topics? >>>>>>>>>>>>>> >>>>>>>>>>>>>> If we like the idea of expressing it as a "hint" for grouping >>>> and a >>>>>>>>>>>>>> "command" for `repartition`, then it seems like it still makes >>>> sense >>>>>>>>>>>>>> to keep Grouped and Repartitioned separate, as they would >>>> actually >>>>>>>>>>>>>> offer different methods with distinct semantics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> WDYT? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> -John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sorry for late reply. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I guess, the question boils down to the intended semantics of >>>>>>>>>>>>>>> `repartition()`. My understanding is as follows: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using >>>> the >>>>>>>>>>>>>>> upstream topic to determine the number of partitions) >>>>>>>>>>>>>>> - KS does auto-repartitioning only for downstream DSL >>>> operators >>>>>>>> like >>>>>>>>>>>>>>> `count()` (eg, a `transform()` does never trigger an >>>>>>>> auto-repartitioning >>>>>>>>>>>>>>> even if the stream is marked as `repartitioningRequired`). >>>>>>>>>>>>>>> - KS offers `through()` to enforce a repartitioning -- >>>> however, >>>>>>>> the user >>>>>>>>>>>>>>> needs to create the topic manually (with the desired number of >>>>>>>> partitions). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I see two main applications for `repartitioning()`: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) repartition data before a `transform()` but user does not >>>> want >>>>>>>> to >>>>>>>>>>>>>>> manage the topic >>>>>>>>>>>>>>> 2) scale out a downstream subtopology >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a >>>> users >>>>>>>> calls >>>>>>>>>>>>>>> it, a repartitining is enforced, with the difference that KS >>>>>>>> manages the >>>>>>>>>>>>>>> topic and the user does not need to create it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This behavior makes (1) and (2) possible. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>>>>>>> repartition >>>>>>>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>>>>>>> have N partitions" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Because of (2), I disagree. Either a user does not care about >>>>>>>> scaling >>>>>>>>>>>>>>> out, for which case she would not specify the number of >>>>>>>> partitions. Or a >>>>>>>>>>>>>>> user does care, and hence wants to enforce the scale out. I >>>> don't >>>>>>>> think >>>>>>>>>>>>>>> that any user would say, "maybe scale out". >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Therefore, the optimizer should never ignore the repartition >>>>>>>> operation. >>>>>>>>>>>>>>> As a "consequence" (because repartitioning is expensive) a >>>> user >>>>>>>> should >>>>>>>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking >>>> an >>>>>>>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be >>>> "dangerous" >>>>>>>>>>>>>>> because it might be too subtle and an "optional scaling out" >>>> as >>>>>>>> laid out >>>>>>>>>>>>>>> above does not make sense IMHO. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am also not worried about "over repartitioning" because the >>>>>>>> result >>>>>>>>>>>>>>> stream would never trigger auto-repartitioning. Only if >>>> multiple >>>>>>>>>>>>>>> consecutive calls to `repartition()` are made it could be bad >>>> -- >>>>>>>> but >>>>>>>>>>>>>>> that's the same with `through()`. In the end, there is always >>>> some >>>>>>>>>>>>>>> responsibility on the user. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be >>>> required, >>>>>>>>>>>>>>> however, for `groupByKey()` it depends if the KStream is >>>> marked as >>>>>>>>>>>>>>> `repartitioningRequired`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a >>>> user to >>>>>>>> set >>>>>>>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different >>>> story, >>>>>>>>>>>>>>> because calling >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> `repartition().groupBy()` >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> does not achieve what we want. Hence, allowing users to pass >>>> in the >>>>>>>>>>>>>>> number of users partitions into `groupBy()` does actually >>>> makes >>>>>>>> sense, >>>>>>>>>>>>>>> because repartitioning will happen anyway and thus we can >>>>>>>> piggyback a >>>>>>>>>>>>>>> scaling decision. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think that John has a fair concern about the overloads, >>>> however, >>>>>>>> I am >>>>>>>>>>>>>>> not convinced that using `Grouped` to specify the number of >>>>>>>> partitions >>>>>>>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned` >>>> and >>>>>>>> both >>>>>>>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I >>>> am >>>>>>>>>>>>>>> wondering if we could bridge the gap between both, if we >>>> would make >>>>>>>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need >>>>>>>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what >>>> seems to >>>>>>>> make >>>>>>>>>>>>>>> the API quite smooth: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> `stream.groupBy(..., Grouped...)` >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> `stream.groupBy(..., Repartitioned...)` >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>> Hi Sophie, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thank you for your reply, very insightful. Looking forward >>>>>>>> hearing others opinion as well on this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman < >>>>>>>> sop...@confluent.io> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>>>>>>> other hand >>>>>>>>>>>>>>>>> Kafka Streams has already >>>>>>>>>>>>>>>>>> optimizer in place which alters topology independently >>>> from user >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree (with you) and think this is a good way to put it >>>> -- we >>>>>>>> currently >>>>>>>>>>>>>>>>> auto-repartition for the user so >>>>>>>>>>>>>>>>> that they don't have to walk through their entire topology >>>> and >>>>>>>> reason about >>>>>>>>>>>>>>>>> when and where to place a >>>>>>>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly >>>> force >>>>>>>> this onto the >>>>>>>>>>>>>>>>> user? How certain are we that >>>>>>>>>>>>>>>>> users will always get this right? It's easy to imagine that >>>>>>>> during >>>>>>>>>>>>>>>>> development, you write your new app with >>>>>>>>>>>>>>>>> correctly placed repartitions in order to use this new >>>> feature. >>>>>>>> During the >>>>>>>>>>>>>>>>> course of development you end up >>>>>>>>>>>>>>>>> tweaking the topology, but don't remember to review or move >>>> the >>>>>>>>>>>>>>>>> repartitioning since you're used to Streams >>>>>>>>>>>>>>>>> doing this for you. If you use only single-partition topics >>>> for >>>>>>>> testing, >>>>>>>>>>>>>>>>> you might not even notice your app is >>>>>>>>>>>>>>>>> spitting out incorrect results! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to >>>>>>>> introduce a new >>>>>>>>>>>>>>>>> feature and say that to use it, you can't take >>>>>>>>>>>>>>>>> advantage of this other feature anymore. Also, is it >>>> possible our >>>>>>>>>>>>>>>>> optimization framework could ever include an >>>>>>>>>>>>>>>>> optimized repartitioning strategy that is better than what a >>>>>>>> user could >>>>>>>>>>>>>>>>> achieve by manually inserting repartitions? >>>>>>>>>>>>>>>>> Do we expect users to have a deep understanding of the best >>>> way >>>>>>>> to >>>>>>>>>>>>>>>>> repartition their particular topology, or is it >>>>>>>>>>>>>>>>> likely they will end up over-repartitioning either due to >>>> missed >>>>>>>>>>>>>>>>> optimizations or unnecessary extra repartitions? >>>>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>>>>>>> repartition >>>>>>>>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>>>>>>>> have N partitions" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped >>>> rather >>>>>>>> than >>>>>>>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in >>>> line >>>>>>>> with the >>>>>>>>>>>>>>>>> current syntax so +1 from me >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze < >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under >>>> review and >>>>>>>> it’s >>>>>>>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP >>>> to >>>>>>>> address >>>>>>>>>>>>>>>>>> couple of concerns raised by Matthias and John. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users >>>>>>>> control over >>>>>>>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by: >>>>>>>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is >>>> done >>>>>>>> in >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> >>>>>>>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which >>>> is >>>>>>>> planned to >>>>>>>>>>>>>>>>>> be separate PR. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> While all agree about general implementation and idea >>>> behind >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR, >>>> introducing new >>>>>>>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some >>>>>>>> questions during >>>>>>>>>>>>>>>>>> the review. >>>>>>>>>>>>>>>>>> Matthias raised concern that there can be cases when user >>>> uses >>>>>>>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual >>>>>>>> repartitioning may >>>>>>>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned` >>>>>>>> would never be >>>>>>>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted >>>> your >>>>>>>> comment). >>>>>>>>>>>>>>>>>> So instead, if user wants to control parallelism of >>>>>>>> sub-topologies, he or >>>>>>>>>>>>>>>>>> she should always use `KStream#repartition` operation >>>> before >>>>>>>> groupBy. Full >>>>>>>>>>>>>>>>>> comment can be seen here: >>>>>>>>>>>>>>>>>> >>>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 < >>>>>>>>>>>>>>>>>> >>>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On the same topic, John pointed out that, from API design >>>>>>>> perspective, we >>>>>>>>>>>>>>>>>> shouldn’t intertwine configuration classes of different >>>>>>>> operators between >>>>>>>>>>>>>>>>>> one another. So instead of introducing new >>>>>>>> `KStream#groupBy(Repartitioned)` >>>>>>>>>>>>>>>>>> for specifying number of partitions for internal topic, we >>>>>>>> should update >>>>>>>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>>>>>>> other hand >>>>>>>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters >>>>>>>> topology >>>>>>>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka >>>>>>>> Streams, >>>>>>>>>>>>>>>>>> internally would optimize topology in the best way >>>> possible, >>>>>>>> even if in >>>>>>>>>>>>>>>>>> some cases this means ignoring some operator configurations >>>>>>>> passed by the >>>>>>>>>>>>>>>>>> user. Also, I agree with John about API design semantics. >>>> If we >>>>>>>> go through >>>>>>>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes >>>> more >>>>>>>> sense to >>>>>>>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead >>>> of >>>>>>>> introducing >>>>>>>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I would really appreciate communities feedback on this. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman < >>>>>>>> sop...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and >>>>>>>> don't have >>>>>>>>>>>>>>>>>> much >>>>>>>>>>>>>>>>>>> spare time at the >>>>>>>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on >>>>>>>> things, but >>>>>>>>>>>>>>>>>> feel >>>>>>>>>>>>>>>>>>> free to pick up something else >>>>>>>>>>>>>>>>>>> to work on in the meantime until things have calmed down >>>> a bit! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze < >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like >>>> to get >>>>>>>> some >>>>>>>>>>>>>>>>>>>> attention on this PR: >>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> < >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>> >>>>>>>>>>>>>>>>>>>> It's been a while now and I would love to move on to >>>> other >>>>>>>> KIPs as well. >>>>>>>>>>>>>>>>>>>> Please let me know if you have any concerns. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze < >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Here’s voting thread for this KIP: >>>>>>>>>>>>>>>>>>>> >>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>>>>>>> >>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html> >>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>>>>>>> >>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze < >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion >>>> on >>>>>>>> that one. >>>>>>>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a >>>> good >>>>>>>> idea. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Updated KIP >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io> >>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto: >>>> matth...@confluent.io>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> One question: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Why do we add >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int >>>>>>>> numberOfPartitions) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It seems that `#with(String name)`, >>>>>>>> `#numberOfPartitions(int)` in >>>>>>>>>>>>>>>>>>>>>>> combination with `withName()` and >>>>>>>> `withNumberOfPartitions()` should >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload >>>>>>>> small if >>>>>>>>>>>>>>>>>>>> possible.) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Otherwise LGTM. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks all for your feedback. >>>>>>>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re >>>> any >>>>>>>> other >>>>>>>>>>>>>>>>>>>> concerns about this KIP, please let me know. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze < >>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense. >>>>>>>>>>>>>>>>>>>>>>>>> I’ve updated KIP ( >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax < >>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto: >>>> matth...@confluent.io>> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>> <mailto: >>>>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a >>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and >>>>>>>> producer >>>>>>>>>>>>>>>>>>>> configs. My >>>>>>>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an >>>>>>>> "extension" to >>>>>>>>>>>>>>>>>>>> `Produced`, >>>>>>>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to >>>> think >>>>>>>> about it and >>>>>>>>>>>>>>>>>>>> helps >>>>>>>>>>>>>>>>>>>>>>>>>> to keep the API "clean". >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to >>>> avoid >>>>>>>>>>>>>>>>>> abbreviations. >>>>>>>>>>>>>>>>>>>> Can >>>>>>>>>>>>>>>>>>>>>>>>>> we rename: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions` >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more >>>> `static` >>>>>>>> methods: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>) >>>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int) >>>>>>>>>>>>>>>>>>>>>>>>>> - >>>> Repartitioned.streamPartitioner(StreamPartitioner) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it >>>> makes >>>>>>>> sense to >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> some duplicate configurations between operators in order >>>> to >>>>>>>> keep API >>>>>>>>>>>>>>>>>> simple >>>>>>>>>>>>>>>>>>>> and usable. >>>>>>>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to >>>> have >>>>>>>> proper >>>>>>>>>>>>>>>>>>>> backward compatibility. >>>>>>>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs >>>>>>>> separate was >>>>>>>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some >>>>>>>> producer level >>>>>>>>>>>>>>>>>>>> configs makes API more readable. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>>> < >>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>>> <mailto: >>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>>> <mailto: >>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep >>>>>>>> producer level >>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level >>>>>>>> considerations >>>>>>>>>>>>>>>>>>>> separate. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic >>>>>>>> level >>>>>>>>>>>>>>>>>>>> configuration. But >>>>>>>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just >>>> as >>>>>>>> much a topic >>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have >>>> two >>>>>>>> producers >>>>>>>>>>>>>>>>>>>> configured >>>>>>>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if >>>>>>>> they are >>>>>>>>>>>>>>>>>>>> writing to the >>>>>>>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to >>>>>>>> part. So in a >>>>>>>>>>>>>>>>>>>> sense, these >>>>>>>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just >>>>>>>> producers. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams >>>> API is >>>>>>>> not always >>>>>>>>>>>>>>>>>>>> true to >>>>>>>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an >>>>>>>> operator are >>>>>>>>>>>>>>>>>>>> wrapped into a >>>>>>>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We >>>> could >>>>>>>> instead >>>>>>>>>>>>>>>>>>>> split out the >>>>>>>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate >>>>>>>> parameter to keep >>>>>>>>>>>>>>>>>>>> topic and >>>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this >>>>>>>> increases the >>>>>>>>>>>>>>>>>>>> API surface >>>>>>>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to >>>> just >>>>>>>> say "this >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs" >>>>>>>> without worrying >>>>>>>>>>>>>>>>>>>> about what >>>>>>>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a >>>>>>>> field of >>>>>>>>>>>>>>>>>>>> Repartitioned, >>>>>>>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition >>>>>>>> elsewhere in >>>>>>>>>>>>>>>>>>>> Streams at >>>>>>>>>>>>>>>>>>>>>>>>>>>> the moment >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani >>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with >>>>>>>>>>>>>>>>>>>> `Repartitioned#partitioner` >>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class >>>> for >>>>>>>> topic level >>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer >>>> level >>>>>>>>>>>>>>>>>>>> configurations (such >>>>>>>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in >>>> this >>>>>>>> thread). >>>>>>>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream >>>>>>>> interface, I >>>>>>>>>>>>>>>>>>>> couldn’t really >>>>>>>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level >>>>>>>> configuration >>>>>>>>>>>>>>>>>>>> class and just >>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda >>>> breaking >>>>>>>> the >>>>>>>>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which >>>>>>>> encapsulates topic >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal >>>> topics is >>>>>>>> viable >>>>>>>>>>>>>>>>>>>> thing to do. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck < >>>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com> >>>>>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto: >>>>>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option. >>>> In >>>>>>>> addition to >>>>>>>>>>>>>>>>>>>> the reason >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than >>>>>>>> hash-based >>>>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something >>>>>>>> different than the >>>>>>>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key. For example: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in >>>>>>>> conjunction with >>>>>>>>>>>>>>>>>>>> the key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on >>>>>>>> Produced hence >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes >>>> sense >>>>>>>> for API >>>>>>>>>>>>>>>>>>>> consistency. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just my 2 cents. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani >>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com>> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to >>>>>>>> Repartitioned class, >>>>>>>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of >>>> partitions for >>>>>>>> internal >>>>>>>>>>>>>>>>>>>> topics, user >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure >>>> co-partitioning >>>>>>>> before join >>>>>>>>>>>>>>>>>>>> operation. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler < >>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> >>>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind. >>>> Again, >>>>>>>> not totally >>>>>>>>>>>>>>>>>>>> sure it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar >>>> is >>>>>>>> the >>>>>>>>>>>>>>>>>> rationale >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani >>>> Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your >>>> example >>>>>>>> I would >>>>>>>>>>>>>>>>>>>> first issue >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper >>>> partitioner >>>>>>>> that >>>>>>>>>>>>>>>>>>>> essentially >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with >>>> and >>>>>>>> then do the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#join >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with DSL? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler >>>> < >>>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>>>>>>> j...@confluent.io >>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io >>>> <mailto: >>>>>>>>>>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an >>>> option to >>>>>>>> specify >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is >>>> that >>>>>>>> some data may >>>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input >>>>>>>> topic. If the >>>>>>>>>>>>>>>>>>>> right-side >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning >>>>>>>> strategy, then the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be >>>>>>>> partitioned with the >>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss >>>>>>>> something >>>>>>>>>>>>>>>>>>>> important? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani >>>>>>>> Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To >>>> be >>>>>>>> honest I’m >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it yet. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really >>>>>>>> think I would >>>>>>>>>>>>>>>>>>>> need control >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka >>>> Streams >>>>>>>> knows best >>>>>>>>>>>>>>>>>>>> how to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should >>>> be >>>>>>>> used only for >>>>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have >>>>>>>> possibility to >>>>>>>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet, >>>> but I >>>>>>>> guess >>>>>>>>>>>>>>>>>>>> specifying the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s >>>>>>>> agreement on >>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie >>>>>>>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto: >>>> sop...@confluent.io> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that >>>>>>>> Repartitioned >>>>>>>>>>>>>>>>>>>> would be a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also >>>> need >>>>>>>> to have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field, >>>> similar >>>>>>>> to >>>>>>>>>>>>>>>>>>>> Produced? I'm not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure how >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but >>>> seems >>>>>>>> it should >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> available >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by >>>> Kafka >>>>>>>> Streams. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user >>>> more >>>>>>>> control over >>>>>>>>>>>>>>>>>>>> the topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> such as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is >>>>>>>> something that >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> missing >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over >>>>>>>> parallelism by >>>>>>>>>>>>>>>>>>>> configuring >>>>>>>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie >>>>>>>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto: >>>> sop...@confluent.io> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one >>>>>>>> thing for me >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a >>>>>>>> Repartitioned, >>>>>>>>>>>>>>>>>>>> will the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems >>>> to be >>>>>>>> the case >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> signature >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it >>>> have to >>>>>>>> be >>>>>>>>>>>>>>>>>>>> pre-created? The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wording >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version >>>> of >>>>>>>> the method >>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-create >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>>> levani.co...@gmail.com> >>>>>>>> <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 ( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing >>>> list :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities >>>>>>>> opinions/concerns >>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>> this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward, >>>> I’ve >>>>>>>> updated >>>>>>>>>>>>>>>>>>>> confluence >>>>>>>>>>>>>>>>>>>>>>>>>>>>> page >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected >>>> Alternatives” >>>>>>>> section. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP >>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing >>>>>>>> dedicated `Topic` >>>>>>>>>>>>>>>>>>>> class for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators >>>> like >>>>>>>>>>>>>>>>>> `groupedBy`. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others >>>> opinion >>>>>>>> about this >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, >>>> Matthias >>>>>>>> J. Sax < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP! >>>> And >>>>>>>> thanks for >>>>>>>>>>>>>>>>>>>> summarizing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been >>>>>>>> discussed >>>>>>>>>>>>>>>>>>>> already (can't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a >>>> good >>>>>>>> summary to >>>>>>>>>>>>>>>>>>>> refresh the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes >>>> sense. >>>>>>>> With regard >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage >>>> topics >>>>>>>> and >>>>>>>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>> that use >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument, >>>> it >>>>>>>> might >>>>>>>>>>>>>>>>>> indicate >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator >>>> that >>>>>>>> uses >>>>>>>>>>>>>>>>>>>> use-defined >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics) and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()` >>>>>>>> operator (an >>>>>>>>>>>>>>>>>>>> operator that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manages >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good. >>>>>>>> Otherwise, there is >>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages >>>>>>>> topics but >>>>>>>>>>>>>>>>>>>> sometimes >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not; a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make >>>> the >>>>>>>> distinction >>>>>>>>>>>>>>>>>>>> clearer. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to >>>>>>>> `Grouped`. I am >>>>>>>>>>>>>>>>>>>> wondering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does >>>> apply >>>>>>>> and adding >>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantically >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get >>>>>>>> opinions of >>>>>>>>>>>>>>>>>>>> others on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, too. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I >>>> prefer >>>>>>>> atm. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration >>>> objects >>>>>>>> that allow >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> configure >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer, >>>> producer, >>>>>>>> store. If we >>>>>>>>>>>>>>>>>>>> assume that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if >>>> we >>>>>>>> should have a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` >>>> class >>>>>>>> and method >>>>>>>>>>>>>>>>>>>> instead of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a plain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to >>>> add >>>>>>>> other >>>>>>>>>>>>>>>>>> configs, >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replication >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc, >>>> easily, >>>>>>>> without the >>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API". >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not >>>> sure >>>>>>>> if I like >>>>>>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>>>>>>>>>> myself. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani >>>> Kokhreidze >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though - >>>>>>>> maybe >>>>>>>>>>>>>>>>>> enhancing >>>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the >>>>>>>> best approach. >>>>>>>>>>>>>>>>>>>> Let me >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class >>>> with >>>>>>>> this >>>>>>>>>>>>>>>>>>>> configuration, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation. >>>> Since >>>>>>>> KStream#to is >>>>>>>>>>>>>>>>>>>> the final >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be >>>> reasonable >>>>>>>>>>>>>>>>>> assumption >>>>>>>>>>>>>>>>>>>> that user >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in >>>> advance. And >>>>>>>> in that >>>>>>>>>>>>>>>>>>>> case, having >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make >>>> much >>>>>>>> sense. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class, >>>> based >>>>>>>> on API >>>>>>>>>>>>>>>>>>>> contract, seems >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something >>>> that >>>>>>>> is >>>>>>>>>>>>>>>>>>>> explicitly for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer, >>>> partitioner >>>>>>>> those all >>>>>>>>>>>>>>>>>>>> are producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions >>>> is >>>>>>>> topic level >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer >>>> level >>>>>>>>>>>>>>>>>>>> configurations >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface, >>>>>>>> seems like >>>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with >>>> non-internal >>>>>>>> (e.g >>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>> created >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics >>>> and I >>>>>>>> think we >>>>>>>>>>>>>>>>>>>> should leave >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into >>>> account, >>>>>>>> I think we >>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinguish >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka >>>>>>>> Streams should >>>>>>>>>>>>>>>>>>>> create and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs >>>>>>>> topics that >>>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding >>>>>>>> numPartitions >>>>>>>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and >>>> producer >>>>>>>> level >>>>>>>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API >>>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name >>>> optional >>>>>>>> in >>>>>>>>>>>>>>>>>>>> KStream#through - I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and >>>> giving >>>>>>>> users >>>>>>>>>>>>>>>>>>>> possibility to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more >>>> useful :) >>>>>>>>>>>>>>>>>> Considering >>>>>>>>>>>>>>>>>>>> arguments >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced >>>>>>>> class, I see two >>>>>>>>>>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>>>>>>> auto-generated and >>>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>>> numOfPartitions) >>>>>>>> - topic >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of >>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>>> numOfPartitions, >>>>>>>> final >>>>>>>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with >>>> generated >>>>>>>> with >>>>>>>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>>>>>>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it >>>> is and >>>>>>>> introduce >>>>>>>>>>>>>>>>>>>> new method >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias >>>>>>>> suggested this >>>>>>>>>>>>>>>>>>>> in one of >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I >>>>>>>> propose the >>>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>>>>>>>>>>> plan: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>>>>>>> configuration to >>>>>>>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method >>>> overloads to >>>>>>>>>>>>>>>>>>>> KStream#through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>>>>>>> auto-generated and >>>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>>> numOfPartitions) >>>>>>>> - topic >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of >>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>>> numOfPartitions, >>>>>>>> final >>>>>>>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with >>>> generated >>>>>>>> with >>>>>>>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>>>>>>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>>>>>>> configuration to >>>>>>>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator >>>>>>>> KStream#repartition for >>>>>>>>>>>>>>>>>>>> creating and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was >>>>>>>> already >>>>>>>>>>>>>>>>>>>> discussed in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the >>>> threads >>>>>>>> that were >>>>>>>>>>>>>>>>>>>> about this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP :( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, >>>> Levani >>>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>>>>>>>>>>>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect >>>> discussion >>>>>>>> around >>>>>>>>>>>>>>>>>>>> KIP-221. Going >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems >>>> to >>>>>>>> agreement >>>>>>>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation, >>>> as far >>>>>>>> as I >>>>>>>>>>>>>>>>>>>> understood, the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the >>>>>>>> following: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to >>>>>>>> KStream#through >>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (essentially >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with >>>>>>>> numOfPartitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL >>>>>>>> users to >>>>>>>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing >>>> stateful >>>>>>>>>>>>>>>>>> operations. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface >>>>>>>> changes around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >