Hello, Just fyi, PR was updated and now it incorporates the latest suggestions about joins. `CopartitionedTopicsEnforcer` will throw an exception if number of partitions aren’t the same when using `repartition` operation along with `join`.
For more details please take a look at the PR: https://github.com/apache/kafka/pull/7170/files <https://github.com/apache/kafka/pull/7170/files> Regards, Levani > On Nov 15, 2019, at 11:01 AM, Matthias J. Sax <matth...@confluent.io> wrote: > > Thanks a lot for the input Sophie. > > Your example is quite useful, and I would use it to support my claim > that a "partition hint" for `Grouped` seems "useless" and does not > improve the user experience. > > 1) You argue that a new user would be worries about repartitions topics > with too many paritions. This would imply that a user is already > advanced enough to understand the implication of repartitioning -- for > this case, I would argue that a user also understand _when_ a > auto-repartitioning would happen and thus the users understands where to > insert a `repartition()` operation. > > 2) For specifying Serdes: if a `groupByKey()` does not trigger > auto-repartitioning it's not required to specify the serdes and if they > are specified they would be ignored/unused (note, that `groupBy()` would > always trigger a repartitioning). Of course, if the default Serdes from > the config match (eg, all data types are Json anyway), a user does not > need to worry about specifying serdes. -- For new user that play around, > I would assume that they work a lot with primitive types and thus would > need to specify the serdes -- hence, they would learn about > auto-repartitioning the hard way anyhow, because each time a > `groupByKey()` does trigger auto-repartioning, they would need to pass > in the correct Serdes -- this way, they would also be educated where to > insert a `repartition()` operator if needed. > > 3) If a new user really just "plays around", I don't think they use an > input topic with 100 partitions but most likely have a local single node > broker with most likely single partitions topics. > > > My main argument for my current proposal is however, that---based on > past experience---it's better to roll out a new feature more carefully > and see how it goes. Last, as John pointed out, we can still extend the > feature in the future. Instead of making a judgment call up-front, being > more conservative and less fancy, and revisit the design based on > actuall user feedback after the first version is rolled out, seems to be > the better option. Undoing a feature is must harder than extending it. > > > While I advocate strong for a simple first version of this feature, it's > a community decission in the end, and I would not block this KIP if > there is a broad preference to add `Grouped#withNumberOfPartitions()` > either. > > > -Matthias > > On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote: >> It seems like we all agree at this point (please correct me if wrong!) that >> we should NOT change >> the existing repartitioning behavior, ie we should allow Streams to >> continue to determine when and >> where to repartition -- *unless* explicitly informed to by the use of a >> .through or the new .repartition operator. >> >> Regarding groupBy, the existing behavior we should not disrupt is >> a) repartition *only* when required due to upstream key-changing operation >> (ie don't force repartitioning >> based on the presence of an optional config parameter), and >> b) allow optimization of required repartitions, if any >> >> Within the constraint of not breaking the existing behavior, this still >> leaves open the question of whether we >> want to improve the user experience by allowing to provide groupBy with a >> *suggestion* for numPartitions (or to >> put it more fairly, whether that *will* improve the experience). I agree >> with many of the arguments outlined above but >> let me just push back on this one issue one final time, and if we can't >> come to a consensus then I am happy to drop >> it for now so that the KIP can proceed. >> >> Specifically, my proposal would be to simply augment Grouped with an >> optional numPartitions, understood to >> indicate the user's desired number of partitions *if Streams decides to >> repartition due to that groupBy* >> >>> if a user cares about the number of partition, the user wants to enforce >> a repartitioning >> First, I think we should take a step back and examine this claim. I agree >> 100% that *if this is true,* >> *then we should not give groupBy an optional numPartitions.* As far as I >> see it, there's no argument >> to be had there if we *presuppose that claim.* But I'm not convinced in >> that as an axiom of the user >> experience and think we should be examining that claim itself, not the >> consequences of it. >> >> To give a simple example, let's say some new user is trying out Streams and >> wants to just play around >> with it to see if it might be worth looking into. They want to just write >> up a simple app and test it out on the >> data in some existing topics they have with a large number of partitions, >> and a lot of data. They're just messing >> around, trying new topologies and don't want to go through each new one >> step by step to determine if (or where) >> a repartition might be required. They also don't want to force a >> repartition if it turns out to not be required, so they'd >> like to avoid the nice new .repartition operator they saw. But given the >> huge number of input partitions, they'd like >> to rest assured that if a repartition does end up being required somewhere >> during dev, it will not be created with >> the same huge number of partitions that their input topic has -- so they >> just pass groupBy a small numPartitions >> suggestion. >> >> I know that's a bit of a contrived example but I think it does highlight >> how and when this might be a considerable >> quality of life improvement, in particular for new users to Streams and/or >> during the dev cycle. *You don't want to* >> *force a repartition if it wasn't necessary, but you don't want to create a >> topic with a huge partition count either.* >> >> Also, while the optimization discussion took us down an interesting but >> ultimately more distracting road, it's worth >> pointing out that it is clearly a major win to have as few >> repartition topics/steps as possible. Given that we >> don't want to change existing behavior, the optimization framework can only >> help out when the placement of >> repartition steps is flexible, which means only those from .groupBy (and >> not .repartition). *Users should not* >> *have to choose between allowing Streams to optimize the repartition >> placement, and allowing to specify a * >> *number of partitions.* >> >> Lastly, I have what may be a stupid question but for my own edification of >> how groupBy works: >> if you do a .groupBy and a repartition is NOT required, does it ever need >> to serialize/deserialize >> any of the data? In other words, if you pass a key/value serde to groupBy >> and it doesn't trigger >> a repartition, is the serde(s) just ignored and thus more like a suggestion >> than a requirement? >> >> So again, I don't want to hold up this KIP forever but I feel we've spent >> some time getting slightly >> off track (although certainly into very interesting discussions) yet never >> really addressed or questioned >> the basic premise: *could a user want to specify a number of partitions but >> not enforce a repartition (at that* >> *specific point in the topology)?* >> >> >> >> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Side remark: >>> >>> If the user specifies `repartition()` on both side of the join, we can >>> actually throw the execption earlier, ie, when we build the topology. >>> >>> Current, we can do this check only after Kafka Streams was started, >>> within `StreamPartitionAssignor#assign()` -- we still need to keep this >>> check for the case that none or only one side has a user specified >>> number of partitions though. >>> >>> >>> -Matthias >>> >>> On 11/14/19 8:15 AM, John Roesler wrote: >>>> Thanks, all, >>>> >>>> I can get behind just totally leaving out reparation-via-groupBy. If >>>> we only introduce `repartition()` for now, we're making the minimal >>>> change to gain the desired capability. >>>> >>>> Plus, since we agree that `repartition()` should never be optimizable, >>>> it's a future-compatible proposal. I.e., if we were to add a >>>> non-optimizable groupBy(partitions) operation now, and want to make it >>>> optimizable in the future, we have to worry about topology >>>> compatibility. Better to just do non-optimizable `repartition()` now, >>>> and add an optimizable `groupBy(partitions)` in the future (maybe). >>>> >>>> About joins, yes, it's a concern, and IMO we should just do the same >>>> thing we do now... check at runtime that the partition counts on both >>>> sides match and throw an exception otherwise. What this means as a >>>> user is that if you explicitly repartition the left side to 100 >>>> partitions, and then join with the right side at 10 partitions, you >>>> get an exception, since this operation is not possible. You'd either >>>> have to "step down" the left side again, back to 10 partitions, or you >>>> could repartition the right side to 100 partitions before the join. >>>> The choice has to be the user's, since it depends on their desired >>>> execution parallelism. >>>> >>>> Thanks, >>>> -John >>>> >>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>> >>>>> Thanks a lot John. I think the way you decompose the operators is super >>>>> helpful for this discussion. >>>>> >>>>> What you suggest with regard to using `Grouped` and enforcing >>>>> repartitioning if the number of partitions is specified is certainly >>>>> possible. However, I am not sure if we _should_ do this. My reasoning is >>>>> that an enforce repartitioning as introduced via `repartition()` is an >>>>> expensive operations, and it seems better to demand an more explicit >>>>> user opt-in to trigger it. Just setting an optional parameter might be >>>>> too subtle to trigger such a heavy "side effect". >>>>> >>>>> While I agree about "usability" in general, I would prefer a more >>>>> conservative appraoch to introduce this feature, see how it goes, and >>>>> maybe make it more advance later on. This also applies to what >>>>> optimzation we may or may not allow (or are able to perform at all). >>>>> >>>>> @Levani: Reflecting about my suggestion about `Repartioned extends >>>>> Grouped`, I agree that it might not be a good idea. >>>>> >>>>> Atm, I see an enforces repartitioning as non-optimizable and as a good >>>>> first step and I would suggest to not intoruce anything else for now. >>>>> Introducing optimizable enforce repartitioning via `groupBy(..., >>>>> Grouped)` is something we could add later. >>>>> >>>>> >>>>> Therefore, I would not change `Grouped` but only introduce >>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to >>>>> set the number of partitions, would need to rewrite their code to >>>>> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but >>>>> also less risky from an API and optimization point of view. >>>>> >>>>> >>>>> @Levani: about joins -> yes, we will need to check the specified number >>>>> of partitions (if any) and if they don't match, throw an exception. We >>>>> can discuss this on the PR -- I am just trying to get the PR for KIP-466 >>>>> merged -- your is next on the list :) >>>>> >>>>> >>>>> Thoughts? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote: >>>>>> Thank you all for an interesting discussion. This is very enlightening. >>>>>> >>>>>> Thank you Matthias for your explanation. Your arguments are very true. >>> It makes sense that if user specifies number of partitions he/she really >>> cares that those specifications are applied to internal topics. >>>>>> Unfortunately, in current implementation this is not true during >>> `join` operation. As I’ve written in the PR comment, currently, when >>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max number of >>> partitions from the two source topics. >>>>>> I’m not really sure what would be the other way around this situation. >>> Maybe fail the stream altogether and inform the user to specify same number >>> of partitions? >>>>>> Or we should treat join operations in a same way as it is right now >>> and basically choose max number of partitions even when `repartition` >>> operation is specified, because Kafka Streams “knows the best” how to >>> handle joins? >>>>>> You can check integration tests how it’s being handled currently. Open >>> to suggestions on that part. >>>>>> >>>>>> As for groupBy, I agree and John raised very interesting points. My >>> arguments for allowing users to specify number of partitions during groupBy >>> operations mainly was coming from the usability perspective. >>>>>> So building on top of what John said, maybe it makes sense to make >>> `groupBy` operations smarter and whenever user specifies >>> `numberOfPartitions` configuration, repartitioning will be enforced, wdyt? >>>>>> I’m not going into optimization part yet :) I think it will be part of >>> separate PR and task, but overall it makes sense to apply optimizations >>> where number of partitions are the same. >>>>>> >>>>>> As for Repartitioned extending Grouped, I kinda feel that it won’t fit >>> nicely in current API design. >>>>>> In addition, in the PR review, John mentioned that there were a lot of >>> troubles in the past trying to use one operation's configuration objects on >>> other operations. >>>>>> Also it makes sense to keep them separate in terms of compatibility. >>>>>> In that case, we don’t have to worry every time Grouped is changed, >>> what would be the implications on `repartition` operations. >>>>>> >>>>>> Kind regards, >>>>>> Levani >>>>>> >>>>>> >>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io> wrote: >>>>>>> >>>>>>> Ah, thanks for the clarification. I missed your point. >>>>>>> >>>>>>> I like the framework you've presented. It does seem simpler to assume >>>>>>> that they either care about the partition count and want to >>>>>>> repartition to realize it, or they don't care about the number. >>>>>>> Returning to this discussion, it does seem unlikely that they care >>>>>>> about the number and _don't_ care if it actually gets realized. >>>>>>> >>>>>>> But then, it still seems like we can just keep the option as part of >>>>>>> Grouped. As in: >>>>>>> >>>>>>> // user does not care >>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/) >>>>>>> stream.groupBy(Grouped /*not specifying partition count*/) >>>>>>> >>>>>>> // user does care >>>>>>> stream.repartition(Repartitioned) >>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...)) >>>>>>> stream.groupBy(Grouped.numberOfPartitions(...)) >>>>>>> >>>>>>> ---- >>>>>>> >>>>>>> The above discussion got me thinking about algebra. Matthias is >>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent to >>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not convinced that >>>>>>> we should force people to apply that expansion themselves vs. having a >>>>>>> more compact way to express it if they don't care where exactly the >>>>>>> repartition occurs. However, thinking about these operators >>>>>>> algebraically can really help *us* narrow down the number of different >>>>>>> expressions we have to consider. >>>>>>> >>>>>>> Let's consider some identities: >>>>>>> >>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg >>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg >>>>>>> C: mapKey(mapper) + ... + groupByKey + agg >>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg >>>>>>> D: repartition = sink(managed) + src >>>>>>> >>>>>>> In these identities, I used one special identifier (...), which means >>>>>>> any number (0+) of operations that are not src, mapKey, groupBy[Key], >>>>>>> repartition, or agg. >>>>>>> >>>>>>> For mental clarity, I'm just going to make up a rule that groupBy >>>>>>> operations are not executable. In other words, you have to get to a >>>>>>> point where you can apply B to convert a groupByKey into a passthough >>>>>>> in order to execute the program. This is just a formal way of stating >>>>>>> what already happens in Kafka Streams. >>>>>>> >>>>>>> By applying A, we can just completely leave `groupBy` out of our >>>>>>> analysis. It trivially decomposes into a mapKey followed by a >>>>>>> groupByKey. >>>>>>> >>>>>>> Then, we can eliminate the "repartition required" case of `groupByKey` >>>>>>> by applying C followed by D to get to the "no repartition required" >>>>>>> version of groupByKey, which in turn sets us up to apply B to get an >>>>>>> executable topology. >>>>>>> >>>>>>> Fundamentally, you can think about KIP-221 is as proposing a modified >>>>>>> D identity in which you can specify the partition count of the managed >>>>>>> sink topic: >>>>>>> D': repartition(pc) = sink(managed w/ pc) + src >>>>>>> >>>>>>> Since users _could_ apply the identities above, we don't actually have >>>>>>> to add any partition count to groupBy[Key], but we decided early on in >>>>>>> the KIP discussion that it's more ergonomic to add it. In that case, >>>>>>> we also have to modify A and C: >>>>>>> A': groupBy(mapper, pc) + agg >>>>>>> = mapKey(mapper) + groupByKey(pc) + agg >>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg >>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg >>>>>>> >>>>>>> Which sets us up still to always be able to get back to a plain >>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and >>>>>>> ultimately B to get an executable topology. >>>>>>> >>>>>>> What about the optimizer? >>>>>>> The optimizer applies another set of graph-algebraic identities to >>>>>>> minimize the number of repartition topics in a topology. >>>>>>> >>>>>>> (forgive my ascii art) >>>>>>> >>>>>>> E: (merging repartition nodes) >>>>>>> (...) -> repartition -> X >>>>>>> \-> repartition -> Y >>>>>>> = >>>>>>> (... + repartition) -> X >>>>>>> \-> Y >>>>>>> F: (reordering around repartition) >>>>>>> Where SVO is any non-key-changing, stateless, operation: >>>>>>> repartition -> SVO = SVO -> repartition >>>>>>> >>>>>>> In terms of these identities, what the optimizer does is apply F >>>>>>> repeatedly in either direction to a topology to factor out common in >>>>>>> branches so that it can apply E to merge repartition nodes. This was >>>>>>> especially necessary before KIP-221 because you couldn't directly >>>>>>> express `repartition` in the DSL, only indirectly via `groupBy[Key]`, >>>>>>> so there was no way to do the factoring manually. >>>>>>> >>>>>>> We can now state very clearly that in KIP-221, explicit >>>>>>> `repartition()` operators should create a "reordering barrier". So, F >>>>>>> cannot be applied to an explicit `repartition()`. Also, I think we >>>>>>> decided earlier that explicit `repartition()` operations would also be >>>>>>> ineligible for merging, so E can't be applied to explicit >>>>>>> `repartition()` operations either. I think we feel we _could_ apply E >>>>>>> without harm, but we want to be conservative for now. >>>>>>> >>>>>>> I think the salient point from the latter discussion has been that >>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_ constitute >>>>>>> an explicit `repartition()` operator, and therefore, the resulting >>>>>>> repartition node remains eligible for optimization. >>>>>>> >>>>>>> To be clear, I agree with Matthias that the provided partition count >>>>>>> _must_ be used in the resulting implicit repartition. This has some >>>>>>> implications for E. Namely, E could only be applied to two repartition >>>>>>> nodes that have the same partition count. This has always been >>>>>>> trivially true before KIP-221 because the partition count has always >>>>>>> been "unspecified", i.e., it would be determined at runtime by the >>>>>>> user-managed-topics' partition counts. Now, it could be specified or >>>>>>> unspecified. We can simply augment E to allow merging only repartition >>>>>>> nodes where the partition count is EITHER "specified and the same on >>>>>>> both sides", OR "unspecified on both sides". >>>>>>> >>>>>>> Sorry for the long email, but I have a hope that it builds a solid >>>>>>> theoretical foundation for our decisions in KIP-221, so we can have >>>>>>> confidence that there are no edge cases for design flaws to hide. >>>>>>> >>>>>>> Thanks, >>>>>>> -John >>>>>>> >>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax < >>> matth...@confluent.io> wrote: >>>>>>>> >>>>>>>>> it seems like we do want to allow >>>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>>> operation, but we don't want that option to _force_ repartitioning >>>>>>>> >>>>>>>> Correct, ie, that is my suggestions. >>>>>>>> >>>>>>>>> "Use P partitions if repartitioning is necessary" >>>>>>>> >>>>>>>> I disagree here, because my reasoning is that: >>>>>>>> >>>>>>>> - if a user cares about the number of partition, the user wants to >>>>>>>> enforce a repartitioning >>>>>>>> - if a user does not case about the number of partitions, we don't >>> need >>>>>>>> to provide them a way to pass in a "hint" >>>>>>>> >>>>>>>> Hence, it should be sufficient to support: >>>>>>>> >>>>>>>> // user does not care >>>>>>>> >>>>>>>> `stream.groupByKey(Grouped)` >>>>>>>> `stream.grouBy(..., Grouped)` >>>>>>>> >>>>>>>> // user does care >>>>>>>> >>>>>>>> `stream.repartition(Repartitioned).groupByKey()` >>>>>>>> `streams.groupBy(..., Repartitioned)` >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote: >>>>>>>>> Thanks for those thoughts, Matthias, >>>>>>>>> >>>>>>>>> I find your reasoning about the optimization behavior compelling. >>> The >>>>>>>>> `through` operation is very simple and clear to reason about. It >>> just >>>>>>>>> passes the data exactly at the specified point in the topology >>> exactly >>>>>>>>> through the specified topic. Likewise, if a user invokes a >>>>>>>>> `repartition` operator, the simplest behavior is if we just do what >>>>>>>>> they asked for. >>>>>>>>> >>>>>>>>> Stepping back to think about when optimizations are surprising and >>>>>>>>> when they aren't, it occurs to me that we should be free to move >>>>>>>>> around repartitions when users have asked to perform some operation >>>>>>>>> that implies a repartition, like "change keys, then filter, then >>>>>>>>> aggregate". This program requires a repartition, but it could be >>>>>>>>> anywhere between the key change and the aggregation. On the other >>>>>>>>> hand, if they say, "change keys, then filter, then repartition, then >>>>>>>>> aggregate", it seems like they were pretty clear about their desire, >>>>>>>>> and we should just take it at face value. >>>>>>>>> >>>>>>>>> So, I'm sold on just literally doing a repartition every time they >>>>>>>>> invoke the `repartition` operator. >>>>>>>>> >>>>>>>>> >>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is more >>> nuanced. >>>>>>>>> >>>>>>>>> What you said about `groupByKey` makes sense. If they key hasn't >>>>>>>>> actually changed, then we don't need to repartition before >>>>>>>>> aggregating. On the other hand, `groupBy` is specifically changing >>> the >>>>>>>>> key as part of the grouping operation, so (as you said) we >>> definitely >>>>>>>>> have to do a repartition. >>>>>>>>> >>>>>>>>> If I'm reading the discussion right, it seems like we do want to >>> allow >>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>> operation, but we don't want that option to _force_ repartitioning >>> if >>>>>>>>> it's not needed. That last clause is the key. "Use P partitions if >>>>>>>>> repartitioning is necessary" is a directive that applies cleanly and >>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call the >>>>>>>>> option `numberOfPartitionsHint`, which along with the "if necessary" >>>>>>>>> javadoc, should make it clear that the option won't force a >>>>>>>>> repartition, and also gives us enough latitude to still employ the >>>>>>>>> optimizer on those repartition topics? >>>>>>>>> >>>>>>>>> If we like the idea of expressing it as a "hint" for grouping and a >>>>>>>>> "command" for `repartition`, then it seems like it still makes sense >>>>>>>>> to keep Grouped and Repartitioned separate, as they would actually >>>>>>>>> offer different methods with distinct semantics. >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax < >>> matth...@confluent.io> wrote: >>>>>>>>>> >>>>>>>>>> Sorry for late reply. >>>>>>>>>> >>>>>>>>>> I guess, the question boils down to the intended semantics of >>>>>>>>>> `repartition()`. My understanding is as follows: >>>>>>>>>> >>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using the >>>>>>>>>> upstream topic to determine the number of partitions) >>>>>>>>>> - KS does auto-repartitioning only for downstream DSL operators >>> like >>>>>>>>>> `count()` (eg, a `transform()` does never trigger an >>> auto-repartitioning >>>>>>>>>> even if the stream is marked as `repartitioningRequired`). >>>>>>>>>> - KS offers `through()` to enforce a repartitioning -- however, >>> the user >>>>>>>>>> needs to create the topic manually (with the desired number of >>> partitions). >>>>>>>>>> >>>>>>>>>> I see two main applications for `repartitioning()`: >>>>>>>>>> >>>>>>>>>> 1) repartition data before a `transform()` but user does not want >>> to >>>>>>>>>> manage the topic >>>>>>>>>> 2) scale out a downstream subtopology >>>>>>>>>> >>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a users >>> calls >>>>>>>>>> it, a repartitining is enforced, with the difference that KS >>> manages the >>>>>>>>>> topic and the user does not need to create it. >>>>>>>>>> >>>>>>>>>> This behavior makes (1) and (2) possible. >>>>>>>>>> >>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>> repartition >>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>> have N partitions" >>>>>>>>>> >>>>>>>>>> Because of (2), I disagree. Either a user does not care about >>> scaling >>>>>>>>>> out, for which case she would not specify the number of >>> partitions. Or a >>>>>>>>>> user does care, and hence wants to enforce the scale out. I don't >>> think >>>>>>>>>> that any user would say, "maybe scale out". >>>>>>>>>> >>>>>>>>>> Therefore, the optimizer should never ignore the repartition >>> operation. >>>>>>>>>> As a "consequence" (because repartitioning is expensive) a user >>> should >>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking an >>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be "dangerous" >>>>>>>>>> because it might be too subtle and an "optional scaling out" as >>> laid out >>>>>>>>>> above does not make sense IMHO. >>>>>>>>>> >>>>>>>>>> I am also not worried about "over repartitioning" because the >>> result >>>>>>>>>> stream would never trigger auto-repartitioning. Only if multiple >>>>>>>>>> consecutive calls to `repartition()` are made it could be bad -- >>> but >>>>>>>>>> that's the same with `through()`. In the end, there is always some >>>>>>>>>> responsibility on the user. >>>>>>>>>> >>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be required, >>>>>>>>>> however, for `groupByKey()` it depends if the KStream is marked as >>>>>>>>>> `repartitioningRequired`. >>>>>>>>>> >>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a user to >>> set >>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different story, >>>>>>>>>> because calling >>>>>>>>>> >>>>>>>>>> `repartition().groupBy()` >>>>>>>>>> >>>>>>>>>> does not achieve what we want. Hence, allowing users to pass in the >>>>>>>>>> number of users partitions into `groupBy()` does actually makes >>> sense, >>>>>>>>>> because repartitioning will happen anyway and thus we can >>> piggyback a >>>>>>>>>> scaling decision. >>>>>>>>>> >>>>>>>>>> I think that John has a fair concern about the overloads, however, >>> I am >>>>>>>>>> not convinced that using `Grouped` to specify the number of >>> partitions >>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned` and >>> both >>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I am >>>>>>>>>> wondering if we could bridge the gap between both, if we would make >>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need >>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what seems to >>> make >>>>>>>>>> the API quite smooth: >>>>>>>>>> >>>>>>>>>> `stream.groupBy(..., Grouped...)` >>>>>>>>>> >>>>>>>>>> `stream.groupBy(..., Repartitioned...)` >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote: >>>>>>>>>>> Hi Sophie, >>>>>>>>>>> >>>>>>>>>>> Thank you for your reply, very insightful. Looking forward >>> hearing others opinion as well on this. >>>>>>>>>>> >>>>>>>>>>> Kind regards, >>>>>>>>>>> Levani >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman < >>> sop...@confluent.io> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>> other hand >>>>>>>>>>>> Kafka Streams has already >>>>>>>>>>>>> optimizer in place which alters topology independently from user >>>>>>>>>>>> >>>>>>>>>>>> I agree (with you) and think this is a good way to put it -- we >>> currently >>>>>>>>>>>> auto-repartition for the user so >>>>>>>>>>>> that they don't have to walk through their entire topology and >>> reason about >>>>>>>>>>>> when and where to place a >>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly force >>> this onto the >>>>>>>>>>>> user? How certain are we that >>>>>>>>>>>> users will always get this right? It's easy to imagine that >>> during >>>>>>>>>>>> development, you write your new app with >>>>>>>>>>>> correctly placed repartitions in order to use this new feature. >>> During the >>>>>>>>>>>> course of development you end up >>>>>>>>>>>> tweaking the topology, but don't remember to review or move the >>>>>>>>>>>> repartitioning since you're used to Streams >>>>>>>>>>>> doing this for you. If you use only single-partition topics for >>> testing, >>>>>>>>>>>> you might not even notice your app is >>>>>>>>>>>> spitting out incorrect results! >>>>>>>>>>>> >>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to >>> introduce a new >>>>>>>>>>>> feature and say that to use it, you can't take >>>>>>>>>>>> advantage of this other feature anymore. Also, is it possible our >>>>>>>>>>>> optimization framework could ever include an >>>>>>>>>>>> optimized repartitioning strategy that is better than what a >>> user could >>>>>>>>>>>> achieve by manually inserting repartitions? >>>>>>>>>>>> Do we expect users to have a deep understanding of the best way >>> to >>>>>>>>>>>> repartition their particular topology, or is it >>>>>>>>>>>> likely they will end up over-repartitioning either due to missed >>>>>>>>>>>> optimizations or unnecessary extra repartitions? >>>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>> repartition >>>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>>> have N partitions" >>>>>>>>>>>> >>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped rather >>> than >>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in line >>> with the >>>>>>>>>>>> current syntax so +1 from me >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze < >>> levani.co...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello all, >>>>>>>>>>>>> >>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under review and >>> it’s >>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP to >>> address >>>>>>>>>>>>> couple of concerns raised by Matthias and John. >>>>>>>>>>>>> >>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users >>> control over >>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by: >>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is done >>> in >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> >>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which is >>> planned to >>>>>>>>>>>>> be separate PR. >>>>>>>>>>>>> >>>>>>>>>>>>> While all agree about general implementation and idea behind >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR, introducing new >>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some >>> questions during >>>>>>>>>>>>> the review. >>>>>>>>>>>>> Matthias raised concern that there can be cases when user uses >>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual >>> repartitioning may >>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned` >>> would never be >>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted your >>> comment). >>>>>>>>>>>>> So instead, if user wants to control parallelism of >>> sub-topologies, he or >>>>>>>>>>>>> she should always use `KStream#repartition` operation before >>> groupBy. Full >>>>>>>>>>>>> comment can be seen here: >>>>>>>>>>>>> >>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 < >>>>>>>>>>>>> >>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125> >>>>>>>>>>>>> >>>>>>>>>>>>> On the same topic, John pointed out that, from API design >>> perspective, we >>>>>>>>>>>>> shouldn’t intertwine configuration classes of different >>> operators between >>>>>>>>>>>>> one another. So instead of introducing new >>> `KStream#groupBy(Repartitioned)` >>>>>>>>>>>>> for specifying number of partitions for internal topic, we >>> should update >>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field. >>>>>>>>>>>>> >>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>> other hand >>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters >>> topology >>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka >>> Streams, >>>>>>>>>>>>> internally would optimize topology in the best way possible, >>> even if in >>>>>>>>>>>>> some cases this means ignoring some operator configurations >>> passed by the >>>>>>>>>>>>> user. Also, I agree with John about API design semantics. If we >>> go through >>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes more >>> sense to >>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead of >>> introducing >>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload. >>>>>>>>>>>>> >>>>>>>>>>>>> I would really appreciate communities feedback on this. >>>>>>>>>>>>> >>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>> Levani >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman < >>> sop...@confluent.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and >>> don't have >>>>>>>>>>>>> much >>>>>>>>>>>>>> spare time at the >>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on >>> things, but >>>>>>>>>>>>> feel >>>>>>>>>>>>>> free to pick up something else >>>>>>>>>>>>>> to work on in the meantime until things have calmed down a bit! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze < >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like to get >>> some >>>>>>>>>>>>>>> attention on this PR: >>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> < >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>> >>>>>>>>>>>>>>> It's been a while now and I would love to move on to other >>> KIPs as well. >>>>>>>>>>>>>>> Please let me know if you have any concerns. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze < >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here’s voting thread for this KIP: >>>>>>>>>>>>>>> >>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html> >>> < >>>>>>>>>>>>>>> >>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html >>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze < >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> >>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion on >>> that one. >>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a good >>> idea. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Updated KIP >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax < >>> matth...@confluent.io >>>>>>>>>>>>> <mailto:matth...@confluent.io> >>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>> >>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> One question: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Why do we add >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int >>> numberOfPartitions) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It seems that `#with(String name)`, >>> `#numberOfPartitions(int)` in >>>>>>>>>>>>>>>>>> combination with `withName()` and >>> `withNumberOfPartitions()` should >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload >>> small if >>>>>>>>>>>>>>> possible.) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Otherwise LGTM. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks all for your feedback. >>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re any >>> other >>>>>>>>>>>>>>> concerns about this KIP, please let me know. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze < >>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense. >>>>>>>>>>>>>>>>>>>> I’ve updated KIP ( >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax < >>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>> >>> <mailto: >>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> <mailto: >>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>> >>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a >>> partitioning >>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and >>> producer >>>>>>>>>>>>>>> configs. My >>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an >>> "extension" to >>>>>>>>>>>>>>> `Produced`, >>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to think >>> about it and >>>>>>>>>>>>>>> helps >>>>>>>>>>>>>>>>>>>>> to keep the API "clean". >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to avoid >>>>>>>>>>>>> abbreviations. >>>>>>>>>>>>>>> Can >>>>>>>>>>>>>>>>>>>>> we rename: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions` >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more `static` >>> methods: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>) >>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int) >>>>>>>>>>>>>>>>>>>>> - Repartitioned.streamPartitioner(StreamPartitioner) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it makes >>> sense to >>>>>>>>>>>>> have >>>>>>>>>>>>>>> some duplicate configurations between operators in order to >>> keep API >>>>>>>>>>>>> simple >>>>>>>>>>>>>>> and usable. >>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to have >>> proper >>>>>>>>>>>>>>> backward compatibility. >>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs >>> separate was >>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some >>> producer level >>>>>>>>>>>>>>> configs makes API more readable. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman < >>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto: >>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto: >>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep >>> producer level >>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level >>> considerations >>>>>>>>>>>>>>> separate. >>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic >>> level >>>>>>>>>>>>>>> configuration. But >>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just as >>> much a topic >>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have two >>> producers >>>>>>>>>>>>>>> configured >>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if >>> they are >>>>>>>>>>>>>>> writing to the >>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to >>> part. So in a >>>>>>>>>>>>>>> sense, these >>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just >>> producers. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams API is >>> not always >>>>>>>>>>>>>>> true to >>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an >>> operator are >>>>>>>>>>>>>>> wrapped into a >>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We could >>> instead >>>>>>>>>>>>>>> split out the >>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate >>> parameter to keep >>>>>>>>>>>>>>> topic and >>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this >>> increases the >>>>>>>>>>>>>>> API surface >>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to just >>> say "this >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs" >>> without worrying >>>>>>>>>>>>>>> about what >>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a >>> field of >>>>>>>>>>>>>>> Repartitioned, >>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition >>> elsewhere in >>>>>>>>>>>>>>> Streams at >>>>>>>>>>>>>>>>>>>>>>> the moment >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze < >>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>> <mailto: >>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback. >>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with >>>>>>>>>>>>>>> `Repartitioned#partitioner` >>>>>>>>>>>>>>>>>>>>>>>> configuration. >>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class for >>> topic level >>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer level >>>>>>>>>>>>>>> configurations (such >>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in this >>> thread). >>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream >>> interface, I >>>>>>>>>>>>>>> couldn’t really >>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level >>> configuration >>>>>>>>>>>>>>> class and just >>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda breaking >>> the >>>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which >>> encapsulates topic >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal topics is >>> viable >>>>>>>>>>>>>>> thing to do. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck < >>> bbej...@gmail.com >>>>>>>>>>>>> <mailto:bbej...@gmail.com> >>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>> >>> <mailto: >>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto: >>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option. In >>> addition to >>>>>>>>>>>>>>> the reason >>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than >>> hash-based >>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something >>> different than the >>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key. For example: >>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in >>> conjunction with >>>>>>>>>>>>>>> the key >>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key >>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on >>> Produced hence >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes sense >>> for API >>>>>>>>>>>>>>> consistency. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Just my 2 cents. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>> <mailto: >>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense. >>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to >>> Repartitioned class, >>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of partitions for >>> internal >>>>>>>>>>>>>>> topics, user >>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure co-partitioning >>> before join >>>>>>>>>>>>>>> operation. >>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature. >>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler < >>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> >>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>> >>> <mailto: >>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind. Again, >>> not totally >>>>>>>>>>>>>>> sure it >>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar is >>> the >>>>>>>>>>>>> rationale >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your example >>> I would >>>>>>>>>>>>>>> first issue >>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper partitioner >>> that >>>>>>>>>>>>>>> essentially >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with and >>> then do the >>>>>>>>>>>>>>>>>>>>>>>> KStream#join >>>>>>>>>>>>>>>>>>>>>>>>>> with DSL? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler < >>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>> j...@confluent.io >>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io <mailto: >>>>>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an option to >>> specify >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is that >>> some data may >>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input >>> topic. If the >>>>>>>>>>>>>>> right-side >>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning >>> strategy, then the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be >>> partitioned with the >>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss >>> something >>>>>>>>>>>>>>> important? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani >>> Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To be >>> honest I’m >>>>>>>>>>>>> not >>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>> it yet. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really >>> think I would >>>>>>>>>>>>>>> need control >>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka Streams >>> knows best >>>>>>>>>>>>>>> how to >>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should be >>> used only for >>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have >>> possibility to >>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet, but I >>> guess >>>>>>>>>>>>>>> specifying the >>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s >>> agreement on >>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie >>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>> <mailto: >>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that >>> Repartitioned >>>>>>>>>>>>>>> would be a >>>>>>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also need >>> to have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field, similar >>> to >>>>>>>>>>>>>>> Produced? I'm not >>>>>>>>>>>>>>>>>>>>>>>>>> sure how >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but seems >>> it should >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> available >>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and >>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by Kafka >>> Streams. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user more >>> control over >>>>>>>>>>>>>>> the topic >>>>>>>>>>>>>>>>>>>>>>>>>> such as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is >>> something that >>>>>>>>>>>>> is >>>>>>>>>>>>>>> missing >>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over >>> parallelism by >>>>>>>>>>>>>>> configuring >>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie >>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>> <mailto: >>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one >>> thing for me >>>>>>>>>>>>> -- >>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a >>> Repartitioned, >>>>>>>>>>>>>>> will the >>>>>>>>>>>>>>>>>>>>>>>>>> topic be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems to be >>> the case >>>>>>>>>>>>> for >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> signature >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it have to >>> be >>>>>>>>>>>>>>> pre-created? The >>>>>>>>>>>>>>>>>>>>>>>>>> wording >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version of >>> the method >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>> auto-create >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>> <mailto: >>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 ( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>> < >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>> >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing list :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities >>> opinions/concerns >>>>>>>>>>>>> about >>>>>>>>>>>>>>> this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward, I’ve >>> updated >>>>>>>>>>>>>>> confluence >>>>>>>>>>>>>>>>>>>>>>>> page >>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected Alternatives” >>> section. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing >>> dedicated `Topic` >>>>>>>>>>>>>>> class for >>>>>>>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators like >>>>>>>>>>>>> `groupedBy`. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others opinion >>> about this >>>>>>>>>>>>> as >>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias >>> J. Sax < >>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP! And >>> thanks for >>>>>>>>>>>>>>> summarizing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been >>> discussed >>>>>>>>>>>>>>> already (can't >>>>>>>>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a good >>> summary to >>>>>>>>>>>>>>> refresh the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes sense. >>> With regard >>>>>>>>>>>>> to >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> distinction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage topics >>> and >>>>>>>>>>>>> operators >>>>>>>>>>>>>>> that use >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument, it >>> might >>>>>>>>>>>>> indicate >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator that >>> uses >>>>>>>>>>>>>>> use-defined >>>>>>>>>>>>>>>>>>>>>>>>>> topics) and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()` >>> operator (an >>>>>>>>>>>>>>> operator that >>>>>>>>>>>>>>>>>>>>>>>>>> manages >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good. >>> Otherwise, there is >>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages >>> topics but >>>>>>>>>>>>>>> sometimes >>>>>>>>>>>>>>>>>>>>>>>> not; a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make the >>> distinction >>>>>>>>>>>>>>> clearer. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to >>> `Grouped`. I am >>>>>>>>>>>>>>> wondering >>>>>>>>>>>>>>>>>>>>>>>>>> if the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does apply >>> and adding >>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>> semantically >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get >>> opinions of >>>>>>>>>>>>>>> others on >>>>>>>>>>>>>>>>>>>>>>>>>> this, too. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I prefer >>> atm. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration objects >>> that allow >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> configure >>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer, producer, >>> store. If we >>>>>>>>>>>>>>> assume that >>>>>>>>>>>>>>>>>>>>>>>>>> a topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if we >>> should have a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class >>> and method >>>>>>>>>>>>>>> instead of >>>>>>>>>>>>>>>>>>>>>>>>>> a plain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to add >>> other >>>>>>>>>>>>> configs, >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>> replication >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc, easily, >>> without the >>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>> change the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API". >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not sure >>> if I like >>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>>>>> myself. >>>>>>>>>>>>>>>>>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though - >>> maybe >>>>>>>>>>>>> enhancing >>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>> with num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the >>> best approach. >>>>>>>>>>>>>>> Let me >>>>>>>>>>>>>>>>>>>>>>>>>> explain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class with >>> this >>>>>>>>>>>>>>> configuration, >>>>>>>>>>>>>>>>>>>>>>>>>> this will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation. Since >>> KStream#to is >>>>>>>>>>>>>>> the final >>>>>>>>>>>>>>>>>>>>>>>>>> sink of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be reasonable >>>>>>>>>>>>> assumption >>>>>>>>>>>>>>> that user >>>>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in advance. And >>> in that >>>>>>>>>>>>>>> case, having >>>>>>>>>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make much >>> sense. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class, based >>> on API >>>>>>>>>>>>>>> contract, seems >>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something that >>> is >>>>>>>>>>>>>>> explicitly for >>>>>>>>>>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer, partitioner >>> those all >>>>>>>>>>>>>>> are producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions is >>> topic level >>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer level >>>>>>>>>>>>>>> configurations >>>>>>>>>>>>>>>>>>>>>>>>>> together in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface, >>> seems like >>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>> parameter is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with non-internal >>> (e.g >>>>>>>>>>>>> topics >>>>>>>>>>>>>>> created >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics and I >>> think we >>>>>>>>>>>>>>> should leave >>>>>>>>>>>>>>>>>>>>>>>>>> it as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into account, >>> I think we >>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>> distinguish >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka >>> Streams should >>>>>>>>>>>>>>> create and >>>>>>>>>>>>>>>>>>>>>>>>>> manage >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs >>> topics that >>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>>>>> created in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding >>> numPartitions >>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and producer >>> level >>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API >>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name optional >>> in >>>>>>>>>>>>>>> KStream#through - I >>>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and giving >>> users >>>>>>>>>>>>>>> possibility to >>>>>>>>>>>>>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more useful :) >>>>>>>>>>>>> Considering >>>>>>>>>>>>>>> arguments >>>>>>>>>>>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced >>> class, I see two >>>>>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>> auto-generated and >>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions) >>> - topic >>>>>>>>>>>>> will >>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions, >>> final >>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated >>> with >>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it is and >>> introduce >>>>>>>>>>>>>>> new method >>>>>>>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias >>> suggested this >>>>>>>>>>>>>>> in one of >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I >>> propose the >>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>>>>>> plan: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>> configuration to >>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method overloads to >>>>>>>>>>>>>>> KStream#through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>> auto-generated and >>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions) >>> - topic >>>>>>>>>>>>> will >>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions, >>> final >>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated >>> with >>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>> configuration to >>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator >>> KStream#repartition for >>>>>>>>>>>>>>> creating and >>>>>>>>>>>>>>>>>>>>>>>>>> managing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was >>> already >>>>>>>>>>>>>>> discussed in the >>>>>>>>>>>>>>>>>>>>>>>>>> mailing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the threads >>> that were >>>>>>>>>>>>>>> about this >>>>>>>>>>>>>>>>>>>>>>>>>> KIP :( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>>>>>>> levani.co...@gmail.com>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect discussion >>> around >>>>>>>>>>>>>>> KIP-221. Going >>>>>>>>>>>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems to >>> agreement >>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation, as far >>> as I >>>>>>>>>>>>>>> understood, the >>>>>>>>>>>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the >>> following: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to >>> KStream#through >>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>>>>> (essentially >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with >>> numOfPartitions >>>>>>>>>>>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL >>> users to >>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing stateful >>>>>>>>>>>>> operations. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface >>> changes around >>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>> >>> >> >