Levani, do you agree to the current proposal? It's basically a de-scoping of the already voted KIP. If you agree, could you update the KIP wiki page accordingly, including the "Rejected Alternative" section (and mabye a link to a follow up Jira ticket).
Because it's a descope, and John and myself support it, there seems to be no need to re-vote. @Sophie,John: thanks a lot for your thoughtful input! -Matthias On 11/15/19 12:47 PM, John Roesler wrote: > Thanks Sophie, > > I think your concern is valid, and also that your idea to make a > ticket is a good idea. > > Creating a ticket has some very positive effects: > * It allows us to record the thinking at this point in time so we > don't have to dig through the mail archives later > * It demonstrates that we did consider the use case, and do want to > address it, but just don't feel confident to implement it right now. > Then, if/when people do have a problem with the gap, the ticket it > already there for them to consider, request, or even pick up. > > Since one aspect of the deferral is a desire to wait for real use > experience, we should explicitly mention that in the ticket. This is > just good information for people browsing the Jira looking for > interesting tickets to pick up. They could still pick it up, but they > can ask themselves if they really understand the real-world use cases > any better than we do right now. > > Thanks, likewise, to you for the good discussion! > -John > > On Fri, Nov 15, 2019 at 2:37 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: >> >> While I'm concerned that "not augmenting groupBy as part of this KIP" >> really translates to "will not get around to augmenting groupBy for a long >> time if not as part of this KIP", like I said I don't want to hold up the >> new >> .repartition operator that it seems we do, at least, all agree on. It's a >> fair >> point that we can always add this in later, but undoing it is far more >> problematic. >> >> Anyways, I would be happy if we at least make a ticket to consider adding a >> "number of partitions" option/suggestion to groupBy, so that we don't lose >> all the thought put in to this decision so far and can avoid rehashing the >> same >> argument word for word and have something to point to when someone >> asks "why didn't we add this numPartitions option to groupBy". >> >> Beyond that, if the community isn't pushing for it at this moment then it >> seems very >> reasonable to shelve the idea for now so that the rest of this KIP can >> proceed. >> Without input one way or another it's hard to say what the right thing to >> do is, >> which makes the right thing to do "wait to add this feature" >> >> Thanks for the good discussion everyone, >> >> Sophie >> >> On Fri, Nov 15, 2019 at 12:41 PM John Roesler <j...@confluent.io> wrote: >> >>> Hi all, >>> >>> I think that Sophie is asking a good question, and I do think that >>> such "blanket configurations" are plausible. For example, we currently >>> support (and I would encourage) "I don't know if this is going to >>> create a repartition topic, but if it does, then use this name instead >>> of generating one". >>> >>> I'm not sure I'm convinced that specifying max parallelism falls into >>> this category. After all, the groupByKey+aggregate will be executed >>> with _some_ max parallelism. It's either the same as the inputs' >>> partition count or overridden with the proposed config. It seems >>> counterintuitive to override the specified option with the default >>> value. >>> >>> I'm not sure if I can put my finger on it, but "maybe use this name" >>> seems way more reasonable to me than "maybe execute with this degree >>> of parallelism". >>> >>> I do think (and I appreciate that this is where Sophie's example is >>> coming from) that Streams should strive to be absolutely as simple and >>> intuitive as possible (while still maintaining correctness). Optimal >>> performance can be at odds with API simplicity. For example, the >>> simplest behavior is, if you ask for 5 partitions, you get 5 >>> partitions. Maybe a repartition is technically not necessary (if you >>> didn't change the key), but at least there's no mystery to this >>> behavior. >>> >>> Clearly, an (opposing) tenent of simplicity is trying to prevent >>> people from making mistakes, which I think is what the example boils >>> down to. Sometimes, we can prevent clear mistakes, like equi-joining >>> two topics with different partition counts. But for this case, it >>> doesn't seem as clear-cut to be able to assume that they _said_ 5 >>> partitions, but they didn't really _want_ 5 partitions. Maybe we can >>> just try to be clear in the documentation, and also even log a warning >>> when we parse the topology, "hey, I've been asked to repartition this >>> stream, but it's not necessary". >>> >>> If anything, this discussion really supports to me the value in just >>> sticking with `repartition()` for now, and deferring >>> `groupBy[Key](partitions)` to the future. >>> >>>> Users should not have to choose between allowing Streams to optimize the >>> repartition placement, and allowing to specify a number of partitions. >>> >>> This is a very fair point, and it may be something that we rapidly >>> return to, but it seems safe for now to introduce the non-optimizable >>> `reparition()` only, and then consider optimization options later. >>> Skipping available optimizations will never break correctness, but >>> adding optimizations can, so it makes sense to treat them with >>> caution. >>> >>> In conclusion, I do think that a use _could_ want to "maybe specify" >>> the partition count, but I also think we can afford to pass on >>> supporting this right now. >>> >>> I'm open to continuing the discussion, but just to avoid ambiguity, I >>> still feel we should _not_ change the groupBy[Key] operation at all, >>> and we should only add `repartition()` as a non-optimizable operation. >>> >>> Thanks all, >>> -John >>> >>> On Fri, Nov 15, 2019 at 11:26 AM Levani Kokhreidze >>> <levani.co...@gmail.com> wrote: >>>> >>>> Hello, >>>> >>>> Just fyi, PR was updated and now it incorporates the latest suggestions >>> about joins. >>>> `CopartitionedTopicsEnforcer` will throw an exception if number of >>> partitions aren’t the same when using `repartition` operation along with >>> `join`. >>>> >>>> For more details please take a look at the PR: >>> https://github.com/apache/kafka/pull/7170/files < >>> https://github.com/apache/kafka/pull/7170/files> >>>> >>>> Regards, >>>> Levani >>>> >>>> >>>>> On Nov 15, 2019, at 11:01 AM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>> >>>>> Thanks a lot for the input Sophie. >>>>> >>>>> Your example is quite useful, and I would use it to support my claim >>>>> that a "partition hint" for `Grouped` seems "useless" and does not >>>>> improve the user experience. >>>>> >>>>> 1) You argue that a new user would be worries about repartitions topics >>>>> with too many paritions. This would imply that a user is already >>>>> advanced enough to understand the implication of repartitioning -- for >>>>> this case, I would argue that a user also understand _when_ a >>>>> auto-repartitioning would happen and thus the users understands where >>> to >>>>> insert a `repartition()` operation. >>>>> >>>>> 2) For specifying Serdes: if a `groupByKey()` does not trigger >>>>> auto-repartitioning it's not required to specify the serdes and if they >>>>> are specified they would be ignored/unused (note, that `groupBy()` >>> would >>>>> always trigger a repartitioning). Of course, if the default Serdes from >>>>> the config match (eg, all data types are Json anyway), a user does not >>>>> need to worry about specifying serdes. -- For new user that play >>> around, >>>>> I would assume that they work a lot with primitive types and thus would >>>>> need to specify the serdes -- hence, they would learn about >>>>> auto-repartitioning the hard way anyhow, because each time a >>>>> `groupByKey()` does trigger auto-repartioning, they would need to pass >>>>> in the correct Serdes -- this way, they would also be educated where to >>>>> insert a `repartition()` operator if needed. >>>>> >>>>> 3) If a new user really just "plays around", I don't think they use an >>>>> input topic with 100 partitions but most likely have a local single >>> node >>>>> broker with most likely single partitions topics. >>>>> >>>>> >>>>> My main argument for my current proposal is however, that---based on >>>>> past experience---it's better to roll out a new feature more carefully >>>>> and see how it goes. Last, as John pointed out, we can still extend the >>>>> feature in the future. Instead of making a judgment call up-front, >>> being >>>>> more conservative and less fancy, and revisit the design based on >>>>> actuall user feedback after the first version is rolled out, seems to >>> be >>>>> the better option. Undoing a feature is must harder than extending it. >>>>> >>>>> >>>>> While I advocate strong for a simple first version of this feature, >>> it's >>>>> a community decission in the end, and I would not block this KIP if >>>>> there is a broad preference to add `Grouped#withNumberOfPartitions()` >>>>> either. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote: >>>>>> It seems like we all agree at this point (please correct me if >>> wrong!) that >>>>>> we should NOT change >>>>>> the existing repartitioning behavior, ie we should allow Streams to >>>>>> continue to determine when and >>>>>> where to repartition -- *unless* explicitly informed to by the use of >>> a >>>>>> .through or the new .repartition operator. >>>>>> >>>>>> Regarding groupBy, the existing behavior we should not disrupt is >>>>>> a) repartition *only* when required due to upstream key-changing >>> operation >>>>>> (ie don't force repartitioning >>>>>> based on the presence of an optional config parameter), and >>>>>> b) allow optimization of required repartitions, if any >>>>>> >>>>>> Within the constraint of not breaking the existing behavior, this >>> still >>>>>> leaves open the question of whether we >>>>>> want to improve the user experience by allowing to provide groupBy >>> with a >>>>>> *suggestion* for numPartitions (or to >>>>>> put it more fairly, whether that *will* improve the experience). I >>> agree >>>>>> with many of the arguments outlined above but >>>>>> let me just push back on this one issue one final time, and if we >>> can't >>>>>> come to a consensus then I am happy to drop >>>>>> it for now so that the KIP can proceed. >>>>>> >>>>>> Specifically, my proposal would be to simply augment Grouped with an >>>>>> optional numPartitions, understood to >>>>>> indicate the user's desired number of partitions *if Streams decides >>> to >>>>>> repartition due to that groupBy* >>>>>> >>>>>>> if a user cares about the number of partition, the user wants to >>> enforce >>>>>> a repartitioning >>>>>> First, I think we should take a step back and examine this claim. I >>> agree >>>>>> 100% that *if this is true,* >>>>>> *then we should not give groupBy an optional numPartitions.* As far >>> as I >>>>>> see it, there's no argument >>>>>> to be had there if we *presuppose that claim.* But I'm not convinced >>> in >>>>>> that as an axiom of the user >>>>>> experience and think we should be examining that claim itself, not the >>>>>> consequences of it. >>>>>> >>>>>> To give a simple example, let's say some new user is trying out >>> Streams and >>>>>> wants to just play around >>>>>> with it to see if it might be worth looking into. They want to just >>> write >>>>>> up a simple app and test it out on the >>>>>> data in some existing topics they have with a large number of >>> partitions, >>>>>> and a lot of data. They're just messing >>>>>> around, trying new topologies and don't want to go through each new >>> one >>>>>> step by step to determine if (or where) >>>>>> a repartition might be required. They also don't want to force a >>>>>> repartition if it turns out to not be required, so they'd >>>>>> like to avoid the nice new .repartition operator they saw. But given >>> the >>>>>> huge number of input partitions, they'd like >>>>>> to rest assured that if a repartition does end up being required >>> somewhere >>>>>> during dev, it will not be created with >>>>>> the same huge number of partitions that their input topic has -- so >>> they >>>>>> just pass groupBy a small numPartitions >>>>>> suggestion. >>>>>> >>>>>> I know that's a bit of a contrived example but I think it does >>> highlight >>>>>> how and when this might be a considerable >>>>>> quality of life improvement, in particular for new users to Streams >>> and/or >>>>>> during the dev cycle. *You don't want to* >>>>>> *force a repartition if it wasn't necessary, but you don't want to >>> create a >>>>>> topic with a huge partition count either.* >>>>>> >>>>>> Also, while the optimization discussion took us down an interesting >>> but >>>>>> ultimately more distracting road, it's worth >>>>>> pointing out that it is clearly a major win to have as few >>>>>> repartition topics/steps as possible. Given that we >>>>>> don't want to change existing behavior, the optimization framework >>> can only >>>>>> help out when the placement of >>>>>> repartition steps is flexible, which means only those from .groupBy >>> (and >>>>>> not .repartition). *Users should not* >>>>>> *have to choose between allowing Streams to optimize the repartition >>>>>> placement, and allowing to specify a * >>>>>> *number of partitions.* >>>>>> >>>>>> Lastly, I have what may be a stupid question but for my own >>> edification of >>>>>> how groupBy works: >>>>>> if you do a .groupBy and a repartition is NOT required, does it ever >>> need >>>>>> to serialize/deserialize >>>>>> any of the data? In other words, if you pass a key/value serde to >>> groupBy >>>>>> and it doesn't trigger >>>>>> a repartition, is the serde(s) just ignored and thus more like a >>> suggestion >>>>>> than a requirement? >>>>>> >>>>>> So again, I don't want to hold up this KIP forever but I feel we've >>> spent >>>>>> some time getting slightly >>>>>> off track (although certainly into very interesting discussions) yet >>> never >>>>>> really addressed or questioned >>>>>> the basic premise: *could a user want to specify a number of >>> partitions but >>>>>> not enforce a repartition (at that* >>>>>> *specific point in the topology)?* >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax < >>> matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Side remark: >>>>>>> >>>>>>> If the user specifies `repartition()` on both side of the join, we >>> can >>>>>>> actually throw the execption earlier, ie, when we build the topology. >>>>>>> >>>>>>> Current, we can do this check only after Kafka Streams was started, >>>>>>> within `StreamPartitionAssignor#assign()` -- we still need to keep >>> this >>>>>>> check for the case that none or only one side has a user specified >>>>>>> number of partitions though. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 11/14/19 8:15 AM, John Roesler wrote: >>>>>>>> Thanks, all, >>>>>>>> >>>>>>>> I can get behind just totally leaving out reparation-via-groupBy. If >>>>>>>> we only introduce `repartition()` for now, we're making the minimal >>>>>>>> change to gain the desired capability. >>>>>>>> >>>>>>>> Plus, since we agree that `repartition()` should never be >>> optimizable, >>>>>>>> it's a future-compatible proposal. I.e., if we were to add a >>>>>>>> non-optimizable groupBy(partitions) operation now, and want to make >>> it >>>>>>>> optimizable in the future, we have to worry about topology >>>>>>>> compatibility. Better to just do non-optimizable `repartition()` >>> now, >>>>>>>> and add an optimizable `groupBy(partitions)` in the future (maybe). >>>>>>>> >>>>>>>> About joins, yes, it's a concern, and IMO we should just do the same >>>>>>>> thing we do now... check at runtime that the partition counts on >>> both >>>>>>>> sides match and throw an exception otherwise. What this means as a >>>>>>>> user is that if you explicitly repartition the left side to 100 >>>>>>>> partitions, and then join with the right side at 10 partitions, you >>>>>>>> get an exception, since this operation is not possible. You'd either >>>>>>>> have to "step down" the left side again, back to 10 partitions, or >>> you >>>>>>>> could repartition the right side to 100 partitions before the join. >>>>>>>> The choice has to be the user's, since it depends on their desired >>>>>>>> execution parallelism. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax < >>> matth...@confluent.io> >>>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks a lot John. I think the way you decompose the operators is >>> super >>>>>>>>> helpful for this discussion. >>>>>>>>> >>>>>>>>> What you suggest with regard to using `Grouped` and enforcing >>>>>>>>> repartitioning if the number of partitions is specified is >>> certainly >>>>>>>>> possible. However, I am not sure if we _should_ do this. My >>> reasoning is >>>>>>>>> that an enforce repartitioning as introduced via `repartition()` >>> is an >>>>>>>>> expensive operations, and it seems better to demand an more >>> explicit >>>>>>>>> user opt-in to trigger it. Just setting an optional parameter >>> might be >>>>>>>>> too subtle to trigger such a heavy "side effect". >>>>>>>>> >>>>>>>>> While I agree about "usability" in general, I would prefer a more >>>>>>>>> conservative appraoch to introduce this feature, see how it goes, >>> and >>>>>>>>> maybe make it more advance later on. This also applies to what >>>>>>>>> optimzation we may or may not allow (or are able to perform at >>> all). >>>>>>>>> >>>>>>>>> @Levani: Reflecting about my suggestion about `Repartioned extends >>>>>>>>> Grouped`, I agree that it might not be a good idea. >>>>>>>>> >>>>>>>>> Atm, I see an enforces repartitioning as non-optimizable and as a >>> good >>>>>>>>> first step and I would suggest to not intoruce anything else for >>> now. >>>>>>>>> Introducing optimizable enforce repartitioning via `groupBy(..., >>>>>>>>> Grouped)` is something we could add later. >>>>>>>>> >>>>>>>>> >>>>>>>>> Therefore, I would not change `Grouped` but only introduce >>>>>>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in >>> to >>>>>>>>> set the number of partitions, would need to rewrite their code to >>>>>>>>> `selectKey(...).repartition(...).groupByKey()`. It's less >>> convinient but >>>>>>>>> also less risky from an API and optimization point of view. >>>>>>>>> >>>>>>>>> >>>>>>>>> @Levani: about joins -> yes, we will need to check the specified >>> number >>>>>>>>> of partitions (if any) and if they don't match, throw an >>> exception. We >>>>>>>>> can discuss this on the PR -- I am just trying to get the PR for >>> KIP-466 >>>>>>>>> merged -- your is next on the list :) >>>>>>>>> >>>>>>>>> >>>>>>>>> Thoughts? >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote: >>>>>>>>>> Thank you all for an interesting discussion. This is very >>> enlightening. >>>>>>>>>> >>>>>>>>>> Thank you Matthias for your explanation. Your arguments are very >>> true. >>>>>>> It makes sense that if user specifies number of partitions he/she >>> really >>>>>>> cares that those specifications are applied to internal topics. >>>>>>>>>> Unfortunately, in current implementation this is not true during >>>>>>> `join` operation. As I’ve written in the PR comment, currently, when >>>>>>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max >>> number of >>>>>>> partitions from the two source topics. >>>>>>>>>> I’m not really sure what would be the other way around this >>> situation. >>>>>>> Maybe fail the stream altogether and inform the user to specify same >>> number >>>>>>> of partitions? >>>>>>>>>> Or we should treat join operations in a same way as it is right >>> now >>>>>>> and basically choose max number of partitions even when `repartition` >>>>>>> operation is specified, because Kafka Streams “knows the best” how to >>>>>>> handle joins? >>>>>>>>>> You can check integration tests how it’s being handled currently. >>> Open >>>>>>> to suggestions on that part. >>>>>>>>>> >>>>>>>>>> As for groupBy, I agree and John raised very interesting points. >>> My >>>>>>> arguments for allowing users to specify number of partitions during >>> groupBy >>>>>>> operations mainly was coming from the usability perspective. >>>>>>>>>> So building on top of what John said, maybe it makes sense to make >>>>>>> `groupBy` operations smarter and whenever user specifies >>>>>>> `numberOfPartitions` configuration, repartitioning will be enforced, >>> wdyt? >>>>>>>>>> I’m not going into optimization part yet :) I think it will be >>> part of >>>>>>> separate PR and task, but overall it makes sense to apply >>> optimizations >>>>>>> where number of partitions are the same. >>>>>>>>>> >>>>>>>>>> As for Repartitioned extending Grouped, I kinda feel that it >>> won’t fit >>>>>>> nicely in current API design. >>>>>>>>>> In addition, in the PR review, John mentioned that there were a >>> lot of >>>>>>> troubles in the past trying to use one operation's configuration >>> objects on >>>>>>> other operations. >>>>>>>>>> Also it makes sense to keep them separate in terms of >>> compatibility. >>>>>>>>>> In that case, we don’t have to worry every time Grouped is >>> changed, >>>>>>> what would be the implications on `repartition` operations. >>>>>>>>>> >>>>>>>>>> Kind regards, >>>>>>>>>> Levani >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io> >>> wrote: >>>>>>>>>>> >>>>>>>>>>> Ah, thanks for the clarification. I missed your point. >>>>>>>>>>> >>>>>>>>>>> I like the framework you've presented. It does seem simpler to >>> assume >>>>>>>>>>> that they either care about the partition count and want to >>>>>>>>>>> repartition to realize it, or they don't care about the number. >>>>>>>>>>> Returning to this discussion, it does seem unlikely that they >>> care >>>>>>>>>>> about the number and _don't_ care if it actually gets realized. >>>>>>>>>>> >>>>>>>>>>> But then, it still seems like we can just keep the option as >>> part of >>>>>>>>>>> Grouped. As in: >>>>>>>>>>> >>>>>>>>>>> // user does not care >>>>>>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/) >>>>>>>>>>> stream.groupBy(Grouped /*not specifying partition count*/) >>>>>>>>>>> >>>>>>>>>>> // user does care >>>>>>>>>>> stream.repartition(Repartitioned) >>>>>>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...)) >>>>>>>>>>> stream.groupBy(Grouped.numberOfPartitions(...)) >>>>>>>>>>> >>>>>>>>>>> ---- >>>>>>>>>>> >>>>>>>>>>> The above discussion got me thinking about algebra. Matthias is >>>>>>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent >>> to >>>>>>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not >>> convinced that >>>>>>>>>>> we should force people to apply that expansion themselves vs. >>> having a >>>>>>>>>>> more compact way to express it if they don't care where exactly >>> the >>>>>>>>>>> repartition occurs. However, thinking about these operators >>>>>>>>>>> algebraically can really help *us* narrow down the number of >>> different >>>>>>>>>>> expressions we have to consider. >>>>>>>>>>> >>>>>>>>>>> Let's consider some identities: >>>>>>>>>>> >>>>>>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg >>>>>>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg >>>>>>>>>>> C: mapKey(mapper) + ... + groupByKey + agg >>>>>>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg >>>>>>>>>>> D: repartition = sink(managed) + src >>>>>>>>>>> >>>>>>>>>>> In these identities, I used one special identifier (...), which >>> means >>>>>>>>>>> any number (0+) of operations that are not src, mapKey, >>> groupBy[Key], >>>>>>>>>>> repartition, or agg. >>>>>>>>>>> >>>>>>>>>>> For mental clarity, I'm just going to make up a rule that groupBy >>>>>>>>>>> operations are not executable. In other words, you have to get >>> to a >>>>>>>>>>> point where you can apply B to convert a groupByKey into a >>> passthough >>>>>>>>>>> in order to execute the program. This is just a formal way of >>> stating >>>>>>>>>>> what already happens in Kafka Streams. >>>>>>>>>>> >>>>>>>>>>> By applying A, we can just completely leave `groupBy` out of our >>>>>>>>>>> analysis. It trivially decomposes into a mapKey followed by a >>>>>>>>>>> groupByKey. >>>>>>>>>>> >>>>>>>>>>> Then, we can eliminate the "repartition required" case of >>> `groupByKey` >>>>>>>>>>> by applying C followed by D to get to the "no repartition >>> required" >>>>>>>>>>> version of groupByKey, which in turn sets us up to apply B to >>> get an >>>>>>>>>>> executable topology. >>>>>>>>>>> >>>>>>>>>>> Fundamentally, you can think about KIP-221 is as proposing a >>> modified >>>>>>>>>>> D identity in which you can specify the partition count of the >>> managed >>>>>>>>>>> sink topic: >>>>>>>>>>> D': repartition(pc) = sink(managed w/ pc) + src >>>>>>>>>>> >>>>>>>>>>> Since users _could_ apply the identities above, we don't >>> actually have >>>>>>>>>>> to add any partition count to groupBy[Key], but we decided early >>> on in >>>>>>>>>>> the KIP discussion that it's more ergonomic to add it. In that >>> case, >>>>>>>>>>> we also have to modify A and C: >>>>>>>>>>> A': groupBy(mapper, pc) + agg >>>>>>>>>>> = mapKey(mapper) + groupByKey(pc) + agg >>>>>>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg >>>>>>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg >>>>>>>>>>> >>>>>>>>>>> Which sets us up still to always be able to get back to a plain >>>>>>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and >>>>>>>>>>> ultimately B to get an executable topology. >>>>>>>>>>> >>>>>>>>>>> What about the optimizer? >>>>>>>>>>> The optimizer applies another set of graph-algebraic identities >>> to >>>>>>>>>>> minimize the number of repartition topics in a topology. >>>>>>>>>>> >>>>>>>>>>> (forgive my ascii art) >>>>>>>>>>> >>>>>>>>>>> E: (merging repartition nodes) >>>>>>>>>>> (...) -> repartition -> X >>>>>>>>>>> \-> repartition -> Y >>>>>>>>>>> = >>>>>>>>>>> (... + repartition) -> X >>>>>>>>>>> \-> Y >>>>>>>>>>> F: (reordering around repartition) >>>>>>>>>>> Where SVO is any non-key-changing, stateless, operation: >>>>>>>>>>> repartition -> SVO = SVO -> repartition >>>>>>>>>>> >>>>>>>>>>> In terms of these identities, what the optimizer does is apply F >>>>>>>>>>> repeatedly in either direction to a topology to factor out >>> common in >>>>>>>>>>> branches so that it can apply E to merge repartition nodes. This >>> was >>>>>>>>>>> especially necessary before KIP-221 because you couldn't directly >>>>>>>>>>> express `repartition` in the DSL, only indirectly via >>> `groupBy[Key]`, >>>>>>>>>>> so there was no way to do the factoring manually. >>>>>>>>>>> >>>>>>>>>>> We can now state very clearly that in KIP-221, explicit >>>>>>>>>>> `repartition()` operators should create a "reordering barrier". >>> So, F >>>>>>>>>>> cannot be applied to an explicit `repartition()`. Also, I think >>> we >>>>>>>>>>> decided earlier that explicit `repartition()` operations would >>> also be >>>>>>>>>>> ineligible for merging, so E can't be applied to explicit >>>>>>>>>>> `repartition()` operations either. I think we feel we _could_ >>> apply E >>>>>>>>>>> without harm, but we want to be conservative for now. >>>>>>>>>>> >>>>>>>>>>> I think the salient point from the latter discussion has been >>> that >>>>>>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_ >>> constitute >>>>>>>>>>> an explicit `repartition()` operator, and therefore, the >>> resulting >>>>>>>>>>> repartition node remains eligible for optimization. >>>>>>>>>>> >>>>>>>>>>> To be clear, I agree with Matthias that the provided partition >>> count >>>>>>>>>>> _must_ be used in the resulting implicit repartition. This has >>> some >>>>>>>>>>> implications for E. Namely, E could only be applied to two >>> repartition >>>>>>>>>>> nodes that have the same partition count. This has always been >>>>>>>>>>> trivially true before KIP-221 because the partition count has >>> always >>>>>>>>>>> been "unspecified", i.e., it would be determined at runtime by >>> the >>>>>>>>>>> user-managed-topics' partition counts. Now, it could be >>> specified or >>>>>>>>>>> unspecified. We can simply augment E to allow merging only >>> repartition >>>>>>>>>>> nodes where the partition count is EITHER "specified and the >>> same on >>>>>>>>>>> both sides", OR "unspecified on both sides". >>>>>>>>>>> >>>>>>>>>>> Sorry for the long email, but I have a hope that it builds a >>> solid >>>>>>>>>>> theoretical foundation for our decisions in KIP-221, so we can >>> have >>>>>>>>>>> confidence that there are no edge cases for design flaws to hide. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax < >>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> it seems like we do want to allow >>>>>>>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>>>>>>> operation, but we don't want that option to _force_ >>> repartitioning >>>>>>>>>>>> >>>>>>>>>>>> Correct, ie, that is my suggestions. >>>>>>>>>>>> >>>>>>>>>>>>> "Use P partitions if repartitioning is necessary" >>>>>>>>>>>> >>>>>>>>>>>> I disagree here, because my reasoning is that: >>>>>>>>>>>> >>>>>>>>>>>> - if a user cares about the number of partition, the user wants >>> to >>>>>>>>>>>> enforce a repartitioning >>>>>>>>>>>> - if a user does not case about the number of partitions, we >>> don't >>>>>>> need >>>>>>>>>>>> to provide them a way to pass in a "hint" >>>>>>>>>>>> >>>>>>>>>>>> Hence, it should be sufficient to support: >>>>>>>>>>>> >>>>>>>>>>>> // user does not care >>>>>>>>>>>> >>>>>>>>>>>> `stream.groupByKey(Grouped)` >>>>>>>>>>>> `stream.grouBy(..., Grouped)` >>>>>>>>>>>> >>>>>>>>>>>> // user does care >>>>>>>>>>>> >>>>>>>>>>>> `stream.repartition(Repartitioned).groupByKey()` >>>>>>>>>>>> `streams.groupBy(..., Repartitioned)` >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote: >>>>>>>>>>>>> Thanks for those thoughts, Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> I find your reasoning about the optimization behavior >>> compelling. >>>>>>> The >>>>>>>>>>>>> `through` operation is very simple and clear to reason about. >>> It >>>>>>> just >>>>>>>>>>>>> passes the data exactly at the specified point in the topology >>>>>>> exactly >>>>>>>>>>>>> through the specified topic. Likewise, if a user invokes a >>>>>>>>>>>>> `repartition` operator, the simplest behavior is if we just do >>> what >>>>>>>>>>>>> they asked for. >>>>>>>>>>>>> >>>>>>>>>>>>> Stepping back to think about when optimizations are surprising >>> and >>>>>>>>>>>>> when they aren't, it occurs to me that we should be free to >>> move >>>>>>>>>>>>> around repartitions when users have asked to perform some >>> operation >>>>>>>>>>>>> that implies a repartition, like "change keys, then filter, >>> then >>>>>>>>>>>>> aggregate". This program requires a repartition, but it could >>> be >>>>>>>>>>>>> anywhere between the key change and the aggregation. On the >>> other >>>>>>>>>>>>> hand, if they say, "change keys, then filter, then >>> repartition, then >>>>>>>>>>>>> aggregate", it seems like they were pretty clear about their >>> desire, >>>>>>>>>>>>> and we should just take it at face value. >>>>>>>>>>>>> >>>>>>>>>>>>> So, I'm sold on just literally doing a repartition every time >>> they >>>>>>>>>>>>> invoke the `repartition` operator. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is >>> more >>>>>>> nuanced. >>>>>>>>>>>>> >>>>>>>>>>>>> What you said about `groupByKey` makes sense. If they key >>> hasn't >>>>>>>>>>>>> actually changed, then we don't need to repartition before >>>>>>>>>>>>> aggregating. On the other hand, `groupBy` is specifically >>> changing >>>>>>> the >>>>>>>>>>>>> key as part of the grouping operation, so (as you said) we >>>>>>> definitely >>>>>>>>>>>>> have to do a repartition. >>>>>>>>>>>>> >>>>>>>>>>>>> If I'm reading the discussion right, it seems like we do want >>> to >>>>>>> allow >>>>>>>>>>>>> people to optionally specify a partition count as part of this >>>>>>>>>>>>> operation, but we don't want that option to _force_ >>> repartitioning >>>>>>> if >>>>>>>>>>>>> it's not needed. That last clause is the key. "Use P >>> partitions if >>>>>>>>>>>>> repartitioning is necessary" is a directive that applies >>> cleanly and >>>>>>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call >>> the >>>>>>>>>>>>> option `numberOfPartitionsHint`, which along with the "if >>> necessary" >>>>>>>>>>>>> javadoc, should make it clear that the option won't force a >>>>>>>>>>>>> repartition, and also gives us enough latitude to still employ >>> the >>>>>>>>>>>>> optimizer on those repartition topics? >>>>>>>>>>>>> >>>>>>>>>>>>> If we like the idea of expressing it as a "hint" for grouping >>> and a >>>>>>>>>>>>> "command" for `repartition`, then it seems like it still makes >>> sense >>>>>>>>>>>>> to keep Grouped and Repartitioned separate, as they would >>> actually >>>>>>>>>>>>> offer different methods with distinct semantics. >>>>>>>>>>>>> >>>>>>>>>>>>> WDYT? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax < >>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sorry for late reply. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I guess, the question boils down to the intended semantics of >>>>>>>>>>>>>> `repartition()`. My understanding is as follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using >>> the >>>>>>>>>>>>>> upstream topic to determine the number of partitions) >>>>>>>>>>>>>> - KS does auto-repartitioning only for downstream DSL >>> operators >>>>>>> like >>>>>>>>>>>>>> `count()` (eg, a `transform()` does never trigger an >>>>>>> auto-repartitioning >>>>>>>>>>>>>> even if the stream is marked as `repartitioningRequired`). >>>>>>>>>>>>>> - KS offers `through()` to enforce a repartitioning -- >>> however, >>>>>>> the user >>>>>>>>>>>>>> needs to create the topic manually (with the desired number of >>>>>>> partitions). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I see two main applications for `repartitioning()`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) repartition data before a `transform()` but user does not >>> want >>>>>>> to >>>>>>>>>>>>>> manage the topic >>>>>>>>>>>>>> 2) scale out a downstream subtopology >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a >>> users >>>>>>> calls >>>>>>>>>>>>>> it, a repartitining is enforced, with the difference that KS >>>>>>> manages the >>>>>>>>>>>>>> topic and the user does not need to create it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This behavior makes (1) and (2) possible. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>>>>>> repartition >>>>>>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>>>>>> have N partitions" >>>>>>>>>>>>>> >>>>>>>>>>>>>> Because of (2), I disagree. Either a user does not care about >>>>>>> scaling >>>>>>>>>>>>>> out, for which case she would not specify the number of >>>>>>> partitions. Or a >>>>>>>>>>>>>> user does care, and hence wants to enforce the scale out. I >>> don't >>>>>>> think >>>>>>>>>>>>>> that any user would say, "maybe scale out". >>>>>>>>>>>>>> >>>>>>>>>>>>>> Therefore, the optimizer should never ignore the repartition >>>>>>> operation. >>>>>>>>>>>>>> As a "consequence" (because repartitioning is expensive) a >>> user >>>>>>> should >>>>>>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking >>> an >>>>>>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be >>> "dangerous" >>>>>>>>>>>>>> because it might be too subtle and an "optional scaling out" >>> as >>>>>>> laid out >>>>>>>>>>>>>> above does not make sense IMHO. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am also not worried about "over repartitioning" because the >>>>>>> result >>>>>>>>>>>>>> stream would never trigger auto-repartitioning. Only if >>> multiple >>>>>>>>>>>>>> consecutive calls to `repartition()` are made it could be bad >>> -- >>>>>>> but >>>>>>>>>>>>>> that's the same with `through()`. In the end, there is always >>> some >>>>>>>>>>>>>> responsibility on the user. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be >>> required, >>>>>>>>>>>>>> however, for `groupByKey()` it depends if the KStream is >>> marked as >>>>>>>>>>>>>> `repartitioningRequired`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a >>> user to >>>>>>> set >>>>>>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different >>> story, >>>>>>>>>>>>>> because calling >>>>>>>>>>>>>> >>>>>>>>>>>>>> `repartition().groupBy()` >>>>>>>>>>>>>> >>>>>>>>>>>>>> does not achieve what we want. Hence, allowing users to pass >>> in the >>>>>>>>>>>>>> number of users partitions into `groupBy()` does actually >>> makes >>>>>>> sense, >>>>>>>>>>>>>> because repartitioning will happen anyway and thus we can >>>>>>> piggyback a >>>>>>>>>>>>>> scaling decision. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think that John has a fair concern about the overloads, >>> however, >>>>>>> I am >>>>>>>>>>>>>> not convinced that using `Grouped` to specify the number of >>>>>>> partitions >>>>>>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned` >>> and >>>>>>> both >>>>>>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I >>> am >>>>>>>>>>>>>> wondering if we could bridge the gap between both, if we >>> would make >>>>>>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need >>>>>>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what >>> seems to >>>>>>> make >>>>>>>>>>>>>> the API quite smooth: >>>>>>>>>>>>>> >>>>>>>>>>>>>> `stream.groupBy(..., Grouped...)` >>>>>>>>>>>>>> >>>>>>>>>>>>>> `stream.groupBy(..., Repartitioned...)` >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>> Hi Sophie, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you for your reply, very insightful. Looking forward >>>>>>> hearing others opinion as well on this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman < >>>>>>> sop...@confluent.io> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>>>>>> other hand >>>>>>>>>>>>>>>> Kafka Streams has already >>>>>>>>>>>>>>>>> optimizer in place which alters topology independently >>> from user >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I agree (with you) and think this is a good way to put it >>> -- we >>>>>>> currently >>>>>>>>>>>>>>>> auto-repartition for the user so >>>>>>>>>>>>>>>> that they don't have to walk through their entire topology >>> and >>>>>>> reason about >>>>>>>>>>>>>>>> when and where to place a >>>>>>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly >>> force >>>>>>> this onto the >>>>>>>>>>>>>>>> user? How certain are we that >>>>>>>>>>>>>>>> users will always get this right? It's easy to imagine that >>>>>>> during >>>>>>>>>>>>>>>> development, you write your new app with >>>>>>>>>>>>>>>> correctly placed repartitions in order to use this new >>> feature. >>>>>>> During the >>>>>>>>>>>>>>>> course of development you end up >>>>>>>>>>>>>>>> tweaking the topology, but don't remember to review or move >>> the >>>>>>>>>>>>>>>> repartitioning since you're used to Streams >>>>>>>>>>>>>>>> doing this for you. If you use only single-partition topics >>> for >>>>>>> testing, >>>>>>>>>>>>>>>> you might not even notice your app is >>>>>>>>>>>>>>>> spitting out incorrect results! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to >>>>>>> introduce a new >>>>>>>>>>>>>>>> feature and say that to use it, you can't take >>>>>>>>>>>>>>>> advantage of this other feature anymore. Also, is it >>> possible our >>>>>>>>>>>>>>>> optimization framework could ever include an >>>>>>>>>>>>>>>> optimized repartitioning strategy that is better than what a >>>>>>> user could >>>>>>>>>>>>>>>> achieve by manually inserting repartitions? >>>>>>>>>>>>>>>> Do we expect users to have a deep understanding of the best >>> way >>>>>>> to >>>>>>>>>>>>>>>> repartition their particular topology, or is it >>>>>>>>>>>>>>>> likely they will end up over-repartitioning either due to >>> missed >>>>>>>>>>>>>>>> optimizations or unnecessary extra repartitions? >>>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a >>>>>>> repartition >>>>>>>>>>>>>>>> required at this point in the topology, it should >>>>>>>>>>>>>>>> have N partitions" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped >>> rather >>>>>>> than >>>>>>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in >>> line >>>>>>> with the >>>>>>>>>>>>>>>> current syntax so +1 from me >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze < >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under >>> review and >>>>>>> it’s >>>>>>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP >>> to >>>>>>> address >>>>>>>>>>>>>>>>> couple of concerns raised by Matthias and John. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users >>>>>>> control over >>>>>>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by: >>>>>>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is >>> done >>>>>>> in >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> >>>>>>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which >>> is >>>>>>> planned to >>>>>>>>>>>>>>>>> be separate PR. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> While all agree about general implementation and idea >>> behind >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR, >>> introducing new >>>>>>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some >>>>>>> questions during >>>>>>>>>>>>>>>>> the review. >>>>>>>>>>>>>>>>> Matthias raised concern that there can be cases when user >>> uses >>>>>>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual >>>>>>> repartitioning may >>>>>>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned` >>>>>>> would never be >>>>>>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted >>> your >>>>>>> comment). >>>>>>>>>>>>>>>>> So instead, if user wants to control parallelism of >>>>>>> sub-topologies, he or >>>>>>>>>>>>>>>>> she should always use `KStream#repartition` operation >>> before >>>>>>> groupBy. Full >>>>>>>>>>>>>>>>> comment can be seen here: >>>>>>>>>>>>>>>>> >>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 < >>>>>>>>>>>>>>>>> >>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On the same topic, John pointed out that, from API design >>>>>>> perspective, we >>>>>>>>>>>>>>>>> shouldn’t intertwine configuration classes of different >>>>>>> operators between >>>>>>>>>>>>>>>>> one another. So instead of introducing new >>>>>>> `KStream#groupBy(Repartitioned)` >>>>>>>>>>>>>>>>> for specifying number of partitions for internal topic, we >>>>>>> should update >>>>>>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the >>>>>>> other hand >>>>>>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters >>>>>>> topology >>>>>>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka >>>>>>> Streams, >>>>>>>>>>>>>>>>> internally would optimize topology in the best way >>> possible, >>>>>>> even if in >>>>>>>>>>>>>>>>> some cases this means ignoring some operator configurations >>>>>>> passed by the >>>>>>>>>>>>>>>>> user. Also, I agree with John about API design semantics. >>> If we >>>>>>> go through >>>>>>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes >>> more >>>>>>> sense to >>>>>>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead >>> of >>>>>>> introducing >>>>>>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would really appreciate communities feedback on this. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman < >>>>>>> sop...@confluent.io> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and >>>>>>> don't have >>>>>>>>>>>>>>>>> much >>>>>>>>>>>>>>>>>> spare time at the >>>>>>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on >>>>>>> things, but >>>>>>>>>>>>>>>>> feel >>>>>>>>>>>>>>>>>> free to pick up something else >>>>>>>>>>>>>>>>>> to work on in the meantime until things have calmed down >>> a bit! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze < >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like >>> to get >>>>>>> some >>>>>>>>>>>>>>>>>>> attention on this PR: >>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> < >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 < >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>> >>>>>>>>>>>>>>>>>>> It's been a while now and I would love to move on to >>> other >>>>>>> KIPs as well. >>>>>>>>>>>>>>>>>>> Please let me know if you have any concerns. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze < >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Here’s voting thread for this KIP: >>>>>>>>>>>>>>>>>>> >>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>>>>>> >>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html> >>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html < >>>>>>>>>>>>>>>>> >>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html >>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze < >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com>>> >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion >>> on >>>>>>> that one. >>>>>>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a >>> good >>>>>>> idea. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Updated KIP >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax < >>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io> >>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto: >>> matth...@confluent.io>>> >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> One question: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Why do we add >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int >>>>>>> numberOfPartitions) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It seems that `#with(String name)`, >>>>>>> `#numberOfPartitions(int)` in >>>>>>>>>>>>>>>>>>>>>> combination with `withName()` and >>>>>>> `withNumberOfPartitions()` should >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload >>>>>>> small if >>>>>>>>>>>>>>>>>>> possible.) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Otherwise LGTM. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks all for your feedback. >>>>>>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re >>> any >>>>>>> other >>>>>>>>>>>>>>>>>>> concerns about this KIP, please let me know. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze < >>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense. >>>>>>>>>>>>>>>>>>>>>>>> I’ve updated KIP ( >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto: >>> matth...@confluent.io>> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>> <mailto: >>>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>> >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a >>>>>>> partitioning >>>>>>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and >>>>>>> producer >>>>>>>>>>>>>>>>>>> configs. My >>>>>>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an >>>>>>> "extension" to >>>>>>>>>>>>>>>>>>> `Produced`, >>>>>>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to >>> think >>>>>>> about it and >>>>>>>>>>>>>>>>>>> helps >>>>>>>>>>>>>>>>>>>>>>>>> to keep the API "clean". >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to >>> avoid >>>>>>>>>>>>>>>>> abbreviations. >>>>>>>>>>>>>>>>>>> Can >>>>>>>>>>>>>>>>>>>>>>>>> we rename: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions` >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more >>> `static` >>>>>>> methods: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>) >>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int) >>>>>>>>>>>>>>>>>>>>>>>>> - >>> Repartitioned.streamPartitioner(StreamPartitioner) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it >>> makes >>>>>>> sense to >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>> some duplicate configurations between operators in order >>> to >>>>>>> keep API >>>>>>>>>>>>>>>>> simple >>>>>>>>>>>>>>>>>>> and usable. >>>>>>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to >>> have >>>>>>> proper >>>>>>>>>>>>>>>>>>> backward compatibility. >>>>>>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs >>>>>>> separate was >>>>>>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some >>>>>>> producer level >>>>>>>>>>>>>>>>>>> configs makes API more readable. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>> < >>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>> <mailto: >>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> >>> <mailto: >>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep >>>>>>> producer level >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level >>>>>>> considerations >>>>>>>>>>>>>>>>>>> separate. >>>>>>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic >>>>>>> level >>>>>>>>>>>>>>>>>>> configuration. But >>>>>>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just >>> as >>>>>>> much a topic >>>>>>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have >>> two >>>>>>> producers >>>>>>>>>>>>>>>>>>> configured >>>>>>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if >>>>>>> they are >>>>>>>>>>>>>>>>>>> writing to the >>>>>>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to >>>>>>> part. So in a >>>>>>>>>>>>>>>>>>> sense, these >>>>>>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just >>>>>>> producers. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams >>> API is >>>>>>> not always >>>>>>>>>>>>>>>>>>> true to >>>>>>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an >>>>>>> operator are >>>>>>>>>>>>>>>>>>> wrapped into a >>>>>>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We >>> could >>>>>>> instead >>>>>>>>>>>>>>>>>>> split out the >>>>>>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate >>>>>>> parameter to keep >>>>>>>>>>>>>>>>>>> topic and >>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this >>>>>>> increases the >>>>>>>>>>>>>>>>>>> API surface >>>>>>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to >>> just >>>>>>> say "this >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs" >>>>>>> without worrying >>>>>>>>>>>>>>>>>>> about what >>>>>>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a >>>>>>> field of >>>>>>>>>>>>>>>>>>> Repartitioned, >>>>>>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition >>>>>>> elsewhere in >>>>>>>>>>>>>>>>>>> Streams at >>>>>>>>>>>>>>>>>>>>>>>>>>> the moment >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> >>>>>>> <mailto: >>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with >>>>>>>>>>>>>>>>>>> `Repartitioned#partitioner` >>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. >>>>>>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class >>> for >>>>>>> topic level >>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer >>> level >>>>>>>>>>>>>>>>>>> configurations (such >>>>>>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in >>> this >>>>>>> thread). >>>>>>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream >>>>>>> interface, I >>>>>>>>>>>>>>>>>>> couldn’t really >>>>>>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level >>>>>>> configuration >>>>>>>>>>>>>>>>>>> class and just >>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda >>> breaking >>>>>>> the >>>>>>>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which >>>>>>> encapsulates topic >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal >>> topics is >>>>>>> viable >>>>>>>>>>>>>>>>>>> thing to do. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck < >>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com> >>>>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto: >>>>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option. >>> In >>>>>>> addition to >>>>>>>>>>>>>>>>>>> the reason >>>>>>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than >>>>>>> hash-based >>>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something >>>>>>> different than the >>>>>>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key. For example: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in >>>>>>> conjunction with >>>>>>>>>>>>>>>>>>> the key >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on >>>>>>> Produced hence >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes >>> sense >>>>>>> for API >>>>>>>>>>>>>>>>>>> consistency. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just my 2 cents. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani >>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com>> >>>>>>> <mailto: >>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to >>>>>>> Repartitioned class, >>>>>>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of >>> partitions for >>>>>>> internal >>>>>>>>>>>>>>>>>>> topics, user >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure >>> co-partitioning >>>>>>> before join >>>>>>>>>>>>>>>>>>> operation. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler < >>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> >>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind. >>> Again, >>>>>>> not totally >>>>>>>>>>>>>>>>>>> sure it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar >>> is >>>>>>> the >>>>>>>>>>>>>>>>> rationale >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani >>> Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your >>> example >>>>>>> I would >>>>>>>>>>>>>>>>>>> first issue >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper >>> partitioner >>>>>>> that >>>>>>>>>>>>>>>>>>> essentially >>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with >>> and >>>>>>> then do the >>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#join >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with DSL? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler >>> < >>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto: >>>>>>> j...@confluent.io >>>>>>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io >>> <mailto: >>>>>>>>>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an >>> option to >>>>>>> specify >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is >>> that >>>>>>> some data may >>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input >>>>>>> topic. If the >>>>>>>>>>>>>>>>>>> right-side >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning >>>>>>> strategy, then the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be >>>>>>> partitioned with the >>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss >>>>>>> something >>>>>>>>>>>>>>>>>>> important? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani >>>>>>> Kokhreidze >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto: >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To >>> be >>>>>>> honest I’m >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it yet. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really >>>>>>> think I would >>>>>>>>>>>>>>>>>>> need control >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka >>> Streams >>>>>>> knows best >>>>>>>>>>>>>>>>>>> how to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should >>> be >>>>>>> used only for >>>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have >>>>>>> possibility to >>>>>>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet, >>> but I >>>>>>> guess >>>>>>>>>>>>>>>>>>> specifying the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s >>>>>>> agreement on >>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie >>>>>>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto: >>> sop...@confluent.io> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that >>>>>>> Repartitioned >>>>>>>>>>>>>>>>>>> would be a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also >>> need >>>>>>> to have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field, >>> similar >>>>>>> to >>>>>>>>>>>>>>>>>>> Produced? I'm not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure how >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but >>> seems >>>>>>> it should >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>> available >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by >>> Kafka >>>>>>> Streams. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user >>> more >>>>>>> control over >>>>>>>>>>>>>>>>>>> the topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> such as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is >>>>>>> something that >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> missing >>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over >>>>>>> parallelism by >>>>>>>>>>>>>>>>>>> configuring >>>>>>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie >>>>>>> Blee-Goldman < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto: >>> sop...@confluent.io> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto: >>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one >>>>>>> thing for me >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a >>>>>>> Repartitioned, >>>>>>>>>>>>>>>>>>> will the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems >>> to be >>>>>>> the case >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> signature >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it >>> have to >>>>>>> be >>>>>>>>>>>>>>>>>>> pre-created? The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wording >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version >>> of >>>>>>> the method >>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-create >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>> levani.co...@gmail.com> >>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto: >>> levani.co...@gmail.com> >>>>>>> <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 ( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing >>> list :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities >>>>>>> opinions/concerns >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward, >>> I’ve >>>>>>> updated >>>>>>>>>>>>>>>>>>> confluence >>>>>>>>>>>>>>>>>>>>>>>>>>>> page >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected >>> Alternatives” >>>>>>> section. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP >>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing >>>>>>> dedicated `Topic` >>>>>>>>>>>>>>>>>>> class for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators >>> like >>>>>>>>>>>>>>>>> `groupedBy`. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others >>> opinion >>>>>>> about this >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, >>> Matthias >>>>>>> J. Sax < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>> >>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP! >>> And >>>>>>> thanks for >>>>>>>>>>>>>>>>>>> summarizing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been >>>>>>> discussed >>>>>>>>>>>>>>>>>>> already (can't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a >>> good >>>>>>> summary to >>>>>>>>>>>>>>>>>>> refresh the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes >>> sense. >>>>>>> With regard >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage >>> topics >>>>>>> and >>>>>>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>> that use >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument, >>> it >>>>>>> might >>>>>>>>>>>>>>>>> indicate >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator >>> that >>>>>>> uses >>>>>>>>>>>>>>>>>>> use-defined >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics) and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()` >>>>>>> operator (an >>>>>>>>>>>>>>>>>>> operator that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manages >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good. >>>>>>> Otherwise, there is >>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages >>>>>>> topics but >>>>>>>>>>>>>>>>>>> sometimes >>>>>>>>>>>>>>>>>>>>>>>>>>>> not; a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make >>> the >>>>>>> distinction >>>>>>>>>>>>>>>>>>> clearer. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to >>>>>>> `Grouped`. I am >>>>>>>>>>>>>>>>>>> wondering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does >>> apply >>>>>>> and adding >>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantically >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get >>>>>>> opinions of >>>>>>>>>>>>>>>>>>> others on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, too. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I >>> prefer >>>>>>> atm. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration >>> objects >>>>>>> that allow >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>> configure >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer, >>> producer, >>>>>>> store. If we >>>>>>>>>>>>>>>>>>> assume that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if >>> we >>>>>>> should have a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` >>> class >>>>>>> and method >>>>>>>>>>>>>>>>>>> instead of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a plain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to >>> add >>>>>>> other >>>>>>>>>>>>>>>>> configs, >>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replication >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc, >>> easily, >>>>>>> without the >>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API". >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not >>> sure >>>>>>> if I like >>>>>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>>>>>>>>> myself. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani >>> Kokhreidze >>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though - >>>>>>> maybe >>>>>>>>>>>>>>>>> enhancing >>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the >>>>>>> best approach. >>>>>>>>>>>>>>>>>>> Let me >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class >>> with >>>>>>> this >>>>>>>>>>>>>>>>>>> configuration, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation. >>> Since >>>>>>> KStream#to is >>>>>>>>>>>>>>>>>>> the final >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be >>> reasonable >>>>>>>>>>>>>>>>> assumption >>>>>>>>>>>>>>>>>>> that user >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in >>> advance. And >>>>>>> in that >>>>>>>>>>>>>>>>>>> case, having >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make >>> much >>>>>>> sense. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class, >>> based >>>>>>> on API >>>>>>>>>>>>>>>>>>> contract, seems >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something >>> that >>>>>>> is >>>>>>>>>>>>>>>>>>> explicitly for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer, >>> partitioner >>>>>>> those all >>>>>>>>>>>>>>>>>>> are producer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions >>> is >>>>>>> topic level >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer >>> level >>>>>>>>>>>>>>>>>>> configurations >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface, >>>>>>> seems like >>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with >>> non-internal >>>>>>> (e.g >>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>> created >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics >>> and I >>>>>>> think we >>>>>>>>>>>>>>>>>>> should leave >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into >>> account, >>>>>>> I think we >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinguish >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka >>>>>>> Streams should >>>>>>>>>>>>>>>>>>> create and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs >>>>>>> topics that >>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding >>>>>>> numPartitions >>>>>>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and >>> producer >>>>>>> level >>>>>>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API >>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name >>> optional >>>>>>> in >>>>>>>>>>>>>>>>>>> KStream#through - I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and >>> giving >>>>>>> users >>>>>>>>>>>>>>>>>>> possibility to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more >>> useful :) >>>>>>>>>>>>>>>>> Considering >>>>>>>>>>>>>>>>>>> arguments >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced >>>>>>> class, I see two >>>>>>>>>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>>>>>> auto-generated and >>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>> numOfPartitions) >>>>>>> - topic >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of >>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>> numOfPartitions, >>>>>>> final >>>>>>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with >>> generated >>>>>>> with >>>>>>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>>>>>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it >>> is and >>>>>>> introduce >>>>>>>>>>>>>>>>>>> new method >>>>>>>>>>>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias >>>>>>> suggested this >>>>>>>>>>>>>>>>>>> in one of >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I >>>>>>> propose the >>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>>>>>>>>>> plan: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>>>>>> configuration to >>>>>>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method >>> overloads to >>>>>>>>>>>>>>>>>>> KStream#through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be >>>>>>> auto-generated and >>>>>>>>>>>>>>>>>>> num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>> numOfPartitions) >>>>>>> - topic >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> be auto >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of >>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int >>> numOfPartitions, >>>>>>> final >>>>>>>>>>>>>>>>>>> Produced<K, V> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with >>> generated >>>>>>> with >>>>>>>>>>>>>>>>>>> specified num of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced >>>>>>> parameter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions >>>>>>> configuration to >>>>>>>>>>>>>>>>> Grouped >>>>>>>>>>>>>>>>>>> class (as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator >>>>>>> KStream#repartition for >>>>>>>>>>>>>>>>>>> creating and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was >>>>>>> already >>>>>>>>>>>>>>>>>>> discussed in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the >>> threads >>>>>>> that were >>>>>>>>>>>>>>>>>>> about this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP :( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, >>> Levani >>>>>>> Kokhreidze < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto: >>>>>>>>>>>>>>>>> levani.co...@gmail.com>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect >>> discussion >>>>>>> around >>>>>>>>>>>>>>>>>>> KIP-221. Going >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems >>> to >>>>>>> agreement >>>>>>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation, >>> as far >>>>>>> as I >>>>>>>>>>>>>>>>>>> understood, the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the >>>>>>> following: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to >>>>>>> KStream#through >>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (essentially >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with >>>>>>> numOfPartitions >>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL >>>>>>> users to >>>>>>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing >>> stateful >>>>>>>>>>>>>>>>> operations. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface >>>>>>> changes around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>>
signature.asc
Description: OpenPGP digital signature