Thanks a lot for the input Sophie.

Your example is quite useful, and I would use it to support my claim
that a "partition hint" for `Grouped` seems "useless" and does not
improve the user experience.

1) You argue that a new user would be worries about repartitions topics
with too many paritions. This would imply that a user is already
advanced enough to understand the implication of repartitioning -- for
this case, I would argue that a user also understand _when_ a
auto-repartitioning would happen and thus the users understands where to
insert a `repartition()` operation.

2) For specifying Serdes: if a `groupByKey()` does not trigger
auto-repartitioning it's not required to specify the serdes and if they
are specified they would be ignored/unused (note, that `groupBy()` would
always trigger a repartitioning). Of course, if the default Serdes from
the config match (eg, all data types are Json anyway), a user does not
need to worry about specifying serdes. -- For new user that play around,
I would assume that they work a lot with primitive types and thus would
need to specify the serdes -- hence, they would learn about
auto-repartitioning the hard way anyhow, because each time a
`groupByKey()` does trigger auto-repartioning, they would need to pass
in the correct Serdes -- this way, they would also be educated where to
insert a `repartition()` operator if needed.

3) If a new user really just "plays around", I don't think they use an
input topic with 100 partitions but most likely have a local single node
broker with most likely single partitions topics.


My main argument for my current proposal is however, that---based on
past experience---it's better to roll out a new feature more carefully
and see how it goes. Last, as John pointed out, we can still extend the
feature in the future. Instead of making a judgment call up-front, being
more conservative and less fancy, and revisit the design based on
actuall user feedback after the first version is rolled out, seems to be
the better option. Undoing a feature is must harder than extending it.


While I advocate strong for a simple first version of this feature, it's
a community decission in the end, and I would not block this KIP if
there is a broad preference to add `Grouped#withNumberOfPartitions()`
either.


-Matthias

On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote:
> It seems like we all agree at this point (please correct me if wrong!) that
> we should NOT change
> the existing repartitioning behavior, ie we should allow Streams to
> continue to determine when and
> where to repartition -- *unless* explicitly informed to by the use of a
> .through or the new .repartition operator.
> 
> Regarding groupBy, the existing behavior we should not disrupt is
> a) repartition *only* when required due to upstream key-changing operation
> (ie don't force repartitioning
> based on the presence of an optional config parameter), and
> b) allow optimization of required repartitions, if any
> 
> Within the constraint of not breaking the existing behavior, this still
> leaves open the question of whether we
> want to improve the user experience by allowing to provide groupBy with a
> *suggestion* for numPartitions (or to
> put it more fairly, whether that *will* improve the experience). I agree
> with many of the arguments outlined above but
> let me just push back on this one issue one final time, and if we can't
> come to a consensus then I am happy to drop
> it for now so that the KIP can proceed.
> 
> Specifically, my proposal would be to simply augment Grouped with an
> optional numPartitions, understood to
> indicate the user's desired number of partitions *if Streams decides to
> repartition due to that groupBy*
> 
>> if a user cares about the number of partition, the user wants to enforce
> a repartitioning
> First, I think we should take a step back and examine this claim. I agree
> 100% that *if this is true,*
> *then we should not give groupBy an optional numPartitions.* As far as I
> see it, there's no argument
> to be had there if we *presuppose that claim.* But I'm not convinced in
> that as an axiom of the user
> experience and think we should be examining that claim itself, not the
> consequences of it.
> 
> To give a simple example, let's say some new user is trying out Streams and
> wants to just play around
> with it to see if it might be worth looking into. They want to just write
> up a simple app and test it out on the
> data in some existing topics they have with a large number of partitions,
> and a lot of data. They're just messing
> around, trying new topologies and don't want to go through each new one
> step by step to determine if (or where)
> a repartition might be required. They also don't want to force a
> repartition if it turns out to not be required, so they'd
> like to avoid the nice new .repartition operator they saw. But given the
> huge number of input partitions, they'd like
> to rest assured that if a repartition does end up being required somewhere
> during dev, it will not be created with
> the same huge number of partitions that their input topic has -- so they
> just pass groupBy a small numPartitions
> suggestion.
> 
> I know that's a bit of a contrived example but I think it does highlight
> how and when this might be a considerable
> quality of life improvement, in particular for new users to Streams and/or
> during the dev cycle. *You don't want to*
> *force a repartition if it wasn't necessary, but you don't want to create a
> topic with a huge partition count either.*
> 
> Also, while the optimization discussion took us down an interesting but
> ultimately more distracting road, it's worth
> pointing out that it is clearly a major win to have as few
> repartition topics/steps as possible. Given that we
> don't want to change existing behavior, the optimization framework can only
> help out when the placement of
> repartition steps is flexible, which means only those from .groupBy (and
> not .repartition). *Users should not*
> *have to choose between allowing Streams to optimize the repartition
> placement, and allowing to specify a *
> *number of partitions.*
> 
> Lastly, I have what may be a stupid question but for my own edification of
> how groupBy works:
> if you do a .groupBy and a repartition is NOT required, does it ever need
> to serialize/deserialize
> any of the data? In other words, if you pass a key/value serde to groupBy
> and it doesn't trigger
> a repartition, is the serde(s) just ignored and thus more like a suggestion
> than a requirement?
> 
> So again, I don't want to hold up this KIP forever but I feel we've spent
> some time getting slightly
> off track (although certainly into very interesting discussions) yet never
> really addressed or questioned
> the basic premise: *could a user want to specify a number of partitions but
> not enforce a repartition (at that*
> *specific point in the topology)?*
> 
> 
> 
> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Side remark:
>>
>> If the user specifies `repartition()` on both side of the join, we can
>> actually throw the execption earlier, ie, when we build the topology.
>>
>> Current, we can do this check only after Kafka Streams was started,
>> within `StreamPartitionAssignor#assign()` -- we still need to keep this
>> check for the case that none or only one side has a user specified
>> number of partitions though.
>>
>>
>> -Matthias
>>
>> On 11/14/19 8:15 AM, John Roesler wrote:
>>> Thanks, all,
>>>
>>> I can get behind just totally leaving out reparation-via-groupBy. If
>>> we only introduce `repartition()` for now, we're making the minimal
>>> change to gain the desired capability.
>>>
>>> Plus, since we agree that `repartition()` should never be optimizable,
>>> it's a future-compatible proposal. I.e., if we were to add a
>>> non-optimizable groupBy(partitions) operation now, and want to make it
>>> optimizable in the future, we have to worry about topology
>>> compatibility. Better to just do non-optimizable `repartition()` now,
>>> and add an optimizable `groupBy(partitions)` in the future (maybe).
>>>
>>> About joins, yes, it's a concern, and IMO we should just do the same
>>> thing we do now... check at runtime that the partition counts on both
>>> sides match and throw an exception otherwise. What this means as a
>>> user is that if you explicitly repartition the left side to 100
>>> partitions, and then join with the right side at 10 partitions, you
>>> get an exception, since this operation is not possible. You'd either
>>> have to "step down" the left side again, back to 10 partitions, or you
>>> could repartition the right side to 100 partitions before the join.
>>> The choice has to be the user's, since it depends on their desired
>>> execution parallelism.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>>
>>>> Thanks a lot John. I think the way you decompose the operators is super
>>>> helpful for this discussion.
>>>>
>>>> What you suggest with regard to using `Grouped` and enforcing
>>>> repartitioning if the number of partitions is specified is certainly
>>>> possible. However, I am not sure if we _should_ do this. My reasoning is
>>>> that an enforce repartitioning as introduced via `repartition()` is an
>>>> expensive operations, and it seems better to demand an more explicit
>>>> user opt-in to trigger it. Just setting an optional parameter might be
>>>> too subtle to trigger such a heavy "side effect".
>>>>
>>>> While I agree about "usability" in general, I would prefer a more
>>>> conservative appraoch to introduce this feature, see how it goes, and
>>>> maybe make it more advance later on. This also applies to what
>>>> optimzation we may or may not allow (or are able to perform at all).
>>>>
>>>> @Levani: Reflecting about my suggestion about `Repartioned extends
>>>> Grouped`, I agree that it might not be a good idea.
>>>>
>>>> Atm, I see an enforces repartitioning as non-optimizable and as a good
>>>> first step and I would suggest to not intoruce anything else for now.
>>>> Introducing optimizable enforce repartitioning via `groupBy(...,
>>>> Grouped)` is something we could add later.
>>>>
>>>>
>>>> Therefore, I would not change `Grouped` but only introduce
>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to
>>>> set the number of partitions, would need to rewrite their code to
>>>> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but
>>>> also less risky from an API and optimization point of view.
>>>>
>>>>
>>>> @Levani: about joins -> yes, we will need to check the specified number
>>>> of partitions (if any) and if they don't match, throw an exception. We
>>>> can discuss this on the PR -- I am just trying to get the PR for KIP-466
>>>> merged -- your is next on the list :)
>>>>
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
>>>>> Thank you all for an interesting discussion. This is very enlightening.
>>>>>
>>>>> Thank you Matthias for your explanation. Your arguments are very true.
>> It makes sense that if user specifies number of partitions he/she really
>> cares that those specifications are applied to internal topics.
>>>>> Unfortunately, in current implementation this is not true during
>> `join` operation. As I’ve written in the PR comment, currently, when
>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max number of
>> partitions from the two source topics.
>>>>> I’m not really sure what would be the other way around this situation.
>> Maybe fail the stream altogether and inform the user to specify same number
>> of partitions?
>>>>> Or we should treat join operations in a same way as it is right now
>> and basically choose max number of partitions even when `repartition`
>> operation is specified, because Kafka Streams “knows the best” how to
>> handle joins?
>>>>> You can check integration tests how it’s being handled currently. Open
>> to suggestions on that part.
>>>>>
>>>>> As for groupBy, I agree and John raised very interesting points. My
>> arguments for allowing users to specify number of partitions during groupBy
>> operations mainly was coming from the usability perspective.
>>>>> So building on top of what John said, maybe it makes sense to make
>> `groupBy` operations smarter and whenever user specifies
>> `numberOfPartitions` configuration, repartitioning will be enforced, wdyt?
>>>>> I’m not going into optimization part yet :) I think it will be part of
>> separate PR and task, but overall it makes sense to apply optimizations
>> where number of partitions are the same.
>>>>>
>>>>> As for Repartitioned extending Grouped, I kinda feel that it won’t fit
>> nicely in current API design.
>>>>> In addition, in the PR review, John mentioned that there were a lot of
>> troubles in the past trying to use one operation's configuration objects on
>> other operations.
>>>>> Also it makes sense to keep them separate in terms of compatibility.
>>>>> In that case, we don’t have to worry every time Grouped is changed,
>> what would be the implications on `repartition` operations.
>>>>>
>>>>> Kind regards,
>>>>> Levani
>>>>>
>>>>>
>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io> wrote:
>>>>>>
>>>>>> Ah, thanks for the clarification. I missed your point.
>>>>>>
>>>>>> I like the framework you've presented. It does seem simpler to assume
>>>>>> that they either care about the partition count and want to
>>>>>> repartition to realize it, or they don't care about the number.
>>>>>> Returning to this discussion, it does seem unlikely that they care
>>>>>> about the number and _don't_ care if it actually gets realized.
>>>>>>
>>>>>> But then, it still seems like we can just keep the option as part of
>>>>>> Grouped. As in:
>>>>>>
>>>>>> // user does not care
>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/)
>>>>>> stream.groupBy(Grouped /*not specifying partition count*/)
>>>>>>
>>>>>> // user does care
>>>>>> stream.repartition(Repartitioned)
>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...))
>>>>>> stream.groupBy(Grouped.numberOfPartitions(...))
>>>>>>
>>>>>> ----
>>>>>>
>>>>>> The above discussion got me thinking about algebra. Matthias is
>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent to
>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not convinced that
>>>>>> we should force people to apply that expansion themselves vs. having a
>>>>>> more compact way to express it if they don't care where exactly the
>>>>>> repartition occurs. However, thinking about these operators
>>>>>> algebraically can really help *us* narrow down the number of different
>>>>>> expressions we have to consider.
>>>>>>
>>>>>> Let's consider some identities:
>>>>>>
>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg
>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg
>>>>>> C: mapKey(mapper) + ... + groupByKey + agg
>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg
>>>>>> D: repartition = sink(managed) + src
>>>>>>
>>>>>> In these identities, I used one special identifier (...), which means
>>>>>> any number (0+) of operations that are not src, mapKey, groupBy[Key],
>>>>>> repartition, or agg.
>>>>>>
>>>>>> For mental clarity, I'm just going to make up a rule that groupBy
>>>>>> operations are not executable. In other words, you have to get to a
>>>>>> point where you can apply B to convert a groupByKey into a passthough
>>>>>> in order to execute the program. This is just a formal way of stating
>>>>>> what already happens in Kafka Streams.
>>>>>>
>>>>>> By applying A, we can just completely leave `groupBy` out of our
>>>>>> analysis. It trivially decomposes into a mapKey followed by a
>>>>>> groupByKey.
>>>>>>
>>>>>> Then, we can eliminate the "repartition required" case of `groupByKey`
>>>>>> by applying C followed by D to get to the "no repartition required"
>>>>>> version of groupByKey, which in turn sets us up to apply B to get an
>>>>>> executable topology.
>>>>>>
>>>>>> Fundamentally, you can think about KIP-221 is as proposing a modified
>>>>>> D identity in which you can specify the partition count of the managed
>>>>>> sink topic:
>>>>>> D': repartition(pc) = sink(managed w/ pc) + src
>>>>>>
>>>>>> Since users _could_ apply the identities above, we don't actually have
>>>>>> to add any partition count to groupBy[Key], but we decided early on in
>>>>>> the KIP discussion that it's more ergonomic to add it. In that case,
>>>>>> we also have to modify A and C:
>>>>>> A': groupBy(mapper, pc) + agg
>>>>>> = mapKey(mapper) + groupByKey(pc) + agg
>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg
>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg
>>>>>>
>>>>>> Which sets us up still to always be able to get back to a plain
>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and
>>>>>> ultimately B to get an executable topology.
>>>>>>
>>>>>> What about the optimizer?
>>>>>> The optimizer applies another set of graph-algebraic identities to
>>>>>> minimize the number of repartition topics in a topology.
>>>>>>
>>>>>> (forgive my ascii art)
>>>>>>
>>>>>> E: (merging repartition nodes)
>>>>>> (...) -> repartition -> X
>>>>>>  \-> repartition -> Y
>>>>>> =
>>>>>> (... + repartition) -> X
>>>>>>     \-> Y
>>>>>> F: (reordering around repartition)
>>>>>> Where SVO is any non-key-changing, stateless, operation:
>>>>>> repartition -> SVO = SVO -> repartition
>>>>>>
>>>>>> In terms of these identities, what the optimizer does is apply F
>>>>>> repeatedly in either direction to a topology to factor out common in
>>>>>> branches so that it can apply E to merge repartition nodes. This was
>>>>>> especially necessary before KIP-221 because you couldn't directly
>>>>>> express `repartition` in the DSL, only indirectly via `groupBy[Key]`,
>>>>>> so there was no way to do the factoring manually.
>>>>>>
>>>>>> We can now state very clearly that in KIP-221, explicit
>>>>>> `repartition()` operators should create a "reordering barrier". So, F
>>>>>> cannot be applied to an explicit `repartition()`. Also, I think we
>>>>>> decided earlier that explicit `repartition()` operations would also be
>>>>>> ineligible for merging, so E can't be applied to explicit
>>>>>> `repartition()` operations either. I think we feel we _could_ apply E
>>>>>> without harm, but we want to be conservative for now.
>>>>>>
>>>>>> I think the salient point from the latter discussion has been that
>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_ constitute
>>>>>> an explicit `repartition()` operator, and therefore, the resulting
>>>>>> repartition node remains eligible for optimization.
>>>>>>
>>>>>> To be clear, I agree with Matthias that the provided partition count
>>>>>> _must_ be used in the resulting implicit repartition. This has some
>>>>>> implications for E. Namely, E could only be applied to two repartition
>>>>>> nodes that have the same partition count. This has always been
>>>>>> trivially true before KIP-221 because the partition count has always
>>>>>> been "unspecified", i.e., it would be determined at runtime by the
>>>>>> user-managed-topics' partition counts. Now, it could be specified or
>>>>>> unspecified. We can simply augment E to allow merging only repartition
>>>>>> nodes where the partition count is EITHER "specified and the same on
>>>>>> both sides", OR "unspecified on both sides".
>>>>>>
>>>>>> Sorry for the long email, but I have a hope that it builds a solid
>>>>>> theoretical foundation for our decisions in KIP-221, so we can have
>>>>>> confidence that there are no edge cases for design flaws to hide.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax <
>> matth...@confluent.io> wrote:
>>>>>>>
>>>>>>>> it seems like we do want to allow
>>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>>> operation, but we don't want that option to _force_ repartitioning
>>>>>>>
>>>>>>> Correct, ie, that is my suggestions.
>>>>>>>
>>>>>>>> "Use P partitions if repartitioning is necessary"
>>>>>>>
>>>>>>> I disagree here, because my reasoning is that:
>>>>>>>
>>>>>>> - if a user cares about the number of partition, the user wants to
>>>>>>> enforce a repartitioning
>>>>>>> - if a user does not case about the number of partitions, we don't
>> need
>>>>>>> to provide them a way to pass in a "hint"
>>>>>>>
>>>>>>> Hence, it should be sufficient to support:
>>>>>>>
>>>>>>> // user does not care
>>>>>>>
>>>>>>>  `stream.groupByKey(Grouped)`
>>>>>>>  `stream.grouBy(..., Grouped)`
>>>>>>>
>>>>>>> // user does care
>>>>>>>
>>>>>>>  `stream.repartition(Repartitioned).groupByKey()`
>>>>>>>  `streams.groupBy(..., Repartitioned)`
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote:
>>>>>>>> Thanks for those thoughts, Matthias,
>>>>>>>>
>>>>>>>> I find your reasoning about the optimization behavior compelling.
>> The
>>>>>>>> `through` operation is very simple and clear to reason about. It
>> just
>>>>>>>> passes the data exactly at the specified point in the topology
>> exactly
>>>>>>>> through the specified topic. Likewise, if a user invokes a
>>>>>>>> `repartition` operator, the simplest behavior is if we just do what
>>>>>>>> they asked for.
>>>>>>>>
>>>>>>>> Stepping back to think about when optimizations are surprising and
>>>>>>>> when they aren't, it occurs to me that we should be free to move
>>>>>>>> around repartitions when users have asked to perform some operation
>>>>>>>> that implies a repartition, like "change keys, then filter, then
>>>>>>>> aggregate". This program requires a repartition, but it could be
>>>>>>>> anywhere between the key change and the aggregation. On the other
>>>>>>>> hand, if they say, "change keys, then filter, then repartition, then
>>>>>>>> aggregate", it seems like they were pretty clear about their desire,
>>>>>>>> and we should just take it at face value.
>>>>>>>>
>>>>>>>> So, I'm sold on just literally doing a repartition every time they
>>>>>>>> invoke the `repartition` operator.
>>>>>>>>
>>>>>>>>
>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is more
>> nuanced.
>>>>>>>>
>>>>>>>> What you said about `groupByKey` makes sense. If they key hasn't
>>>>>>>> actually changed, then we don't need to repartition before
>>>>>>>> aggregating. On the other hand, `groupBy` is specifically changing
>> the
>>>>>>>> key as part of the grouping operation, so (as you said) we
>> definitely
>>>>>>>> have to do a repartition.
>>>>>>>>
>>>>>>>> If I'm reading the discussion right, it seems like we do want to
>> allow
>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>> operation, but we don't want that option to _force_ repartitioning
>> if
>>>>>>>> it's not needed. That last clause is the key. "Use P partitions if
>>>>>>>> repartitioning is necessary" is a directive that applies cleanly and
>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call the
>>>>>>>> option `numberOfPartitionsHint`, which along with the "if necessary"
>>>>>>>> javadoc, should make it clear that the option won't force a
>>>>>>>> repartition, and also gives us enough latitude to still employ the
>>>>>>>> optimizer on those repartition topics?
>>>>>>>>
>>>>>>>> If we like the idea of expressing it as a "hint" for grouping and a
>>>>>>>> "command" for `repartition`, then it seems like it still makes sense
>>>>>>>> to keep Grouped and Repartitioned separate, as they would actually
>>>>>>>> offer different methods with distinct semantics.
>>>>>>>>
>>>>>>>> WDYT?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax <
>> matth...@confluent.io> wrote:
>>>>>>>>>
>>>>>>>>> Sorry for late reply.
>>>>>>>>>
>>>>>>>>> I guess, the question boils down to the intended semantics of
>>>>>>>>> `repartition()`. My understanding is as follows:
>>>>>>>>>
>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using the
>>>>>>>>> upstream topic to determine the number of partitions)
>>>>>>>>> - KS does auto-repartitioning only for downstream DSL operators
>> like
>>>>>>>>> `count()` (eg, a `transform()` does never trigger an
>> auto-repartitioning
>>>>>>>>> even if the stream is marked as `repartitioningRequired`).
>>>>>>>>> - KS offers `through()` to enforce a repartitioning -- however,
>> the user
>>>>>>>>> needs to create the topic manually (with the desired number of
>> partitions).
>>>>>>>>>
>>>>>>>>> I see two main applications for `repartitioning()`:
>>>>>>>>>
>>>>>>>>> 1) repartition data before a `transform()` but user does not want
>> to
>>>>>>>>> manage the topic
>>>>>>>>> 2) scale out a downstream subtopology
>>>>>>>>>
>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a users
>> calls
>>>>>>>>> it, a repartitining is enforced, with the difference that KS
>> manages the
>>>>>>>>> topic and the user does not need to create it.
>>>>>>>>>
>>>>>>>>> This behavior makes (1) and (2) possible.
>>>>>>>>>
>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>> repartition
>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>> have N partitions"
>>>>>>>>>
>>>>>>>>> Because of (2), I disagree. Either a user does not care about
>> scaling
>>>>>>>>> out, for which case she would not specify the number of
>> partitions. Or a
>>>>>>>>> user does care, and hence wants to enforce the scale out. I don't
>> think
>>>>>>>>> that any user would say, "maybe scale out".
>>>>>>>>>
>>>>>>>>> Therefore, the optimizer should never ignore the repartition
>> operation.
>>>>>>>>> As a "consequence" (because repartitioning is expensive) a user
>> should
>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking an
>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be "dangerous"
>>>>>>>>> because it might be too subtle and an "optional scaling out" as
>> laid out
>>>>>>>>> above does not make sense IMHO.
>>>>>>>>>
>>>>>>>>> I am also not worried about "over repartitioning" because the
>> result
>>>>>>>>> stream would never trigger auto-repartitioning. Only if multiple
>>>>>>>>> consecutive calls to `repartition()` are made it could be bad --
>> but
>>>>>>>>> that's the same with `through()`. In the end, there is always some
>>>>>>>>> responsibility on the user.
>>>>>>>>>
>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be required,
>>>>>>>>> however, for `groupByKey()` it depends if the KStream is marked as
>>>>>>>>> `repartitioningRequired`.
>>>>>>>>>
>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a user to
>> set
>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different story,
>>>>>>>>> because calling
>>>>>>>>>
>>>>>>>>>   `repartition().groupBy()`
>>>>>>>>>
>>>>>>>>> does not achieve what we want. Hence, allowing users to pass in the
>>>>>>>>> number of users partitions into `groupBy()` does actually makes
>> sense,
>>>>>>>>> because repartitioning will happen anyway and thus we can
>> piggyback a
>>>>>>>>> scaling decision.
>>>>>>>>>
>>>>>>>>> I think that John has a fair concern about the overloads, however,
>> I am
>>>>>>>>> not convinced that using `Grouped` to specify the number of
>> partitions
>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned` and
>> both
>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I am
>>>>>>>>> wondering if we could bridge the gap between both, if we would make
>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need
>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what seems to
>> make
>>>>>>>>> the API quite smooth:
>>>>>>>>>
>>>>>>>>>  `stream.groupBy(..., Grouped...)`
>>>>>>>>>
>>>>>>>>>  `stream.groupBy(..., Repartitioned...)`
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thoughts?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote:
>>>>>>>>>> Hi Sophie,
>>>>>>>>>>
>>>>>>>>>> Thank you for your reply, very insightful. Looking forward
>> hearing others opinion as well on this.
>>>>>>>>>>
>>>>>>>>>> Kind regards,
>>>>>>>>>> Levani
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman <
>> sop...@confluent.io> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>> other hand
>>>>>>>>>>> Kafka Streams has already
>>>>>>>>>>>> optimizer in place which alters topology independently from user
>>>>>>>>>>>
>>>>>>>>>>> I agree (with you) and think this is a good way to put it -- we
>> currently
>>>>>>>>>>> auto-repartition for the user so
>>>>>>>>>>> that they don't have to walk through their entire topology and
>> reason about
>>>>>>>>>>> when and where to place a
>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly force
>> this onto the
>>>>>>>>>>> user? How certain are we that
>>>>>>>>>>> users will always get this right? It's easy to imagine that
>> during
>>>>>>>>>>> development, you write your new app with
>>>>>>>>>>> correctly placed repartitions in order to use this new feature.
>> During the
>>>>>>>>>>> course of development you end up
>>>>>>>>>>> tweaking the topology, but don't remember to review or move the
>>>>>>>>>>> repartitioning since you're used to Streams
>>>>>>>>>>> doing this for you. If you use only single-partition topics for
>> testing,
>>>>>>>>>>> you might not even notice your app is
>>>>>>>>>>> spitting out incorrect results!
>>>>>>>>>>>
>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to
>> introduce a new
>>>>>>>>>>> feature and say that to use it, you can't take
>>>>>>>>>>> advantage of this other feature anymore. Also, is it possible our
>>>>>>>>>>> optimization framework could ever include an
>>>>>>>>>>> optimized repartitioning strategy that is better than what a
>> user could
>>>>>>>>>>> achieve by manually inserting repartitions?
>>>>>>>>>>> Do we expect users to have a deep understanding of the best way
>> to
>>>>>>>>>>> repartition their particular topology, or is it
>>>>>>>>>>> likely they will end up over-repartitioning either due to missed
>>>>>>>>>>> optimizations or unnecessary extra repartitions?
>>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>> repartition
>>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>>> have N partitions"
>>>>>>>>>>>
>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped rather
>> than
>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in line
>> with the
>>>>>>>>>>> current syntax so +1 from me
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>
>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under review and
>> it’s
>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP to
>> address
>>>>>>>>>>>> couple of concerns raised by Matthias and John.
>>>>>>>>>>>>
>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users
>> control over
>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by:
>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is done
>> in
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>
>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which is
>> planned to
>>>>>>>>>>>> be separate PR.
>>>>>>>>>>>>
>>>>>>>>>>>> While all agree about general implementation and idea behind
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR, introducing new
>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some
>> questions during
>>>>>>>>>>>> the review.
>>>>>>>>>>>> Matthias raised concern that there can be cases when user uses
>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual
>> repartitioning may
>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned`
>> would never be
>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted your
>> comment).
>>>>>>>>>>>> So instead, if user wants to control parallelism of
>> sub-topologies, he or
>>>>>>>>>>>> she should always use `KStream#repartition` operation before
>> groupBy. Full
>>>>>>>>>>>> comment can be seen here:
>>>>>>>>>>>>
>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 <
>>>>>>>>>>>>
>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125>
>>>>>>>>>>>>
>>>>>>>>>>>> On the same topic, John pointed out that, from API design
>> perspective, we
>>>>>>>>>>>> shouldn’t intertwine configuration classes of different
>> operators between
>>>>>>>>>>>> one another. So instead of introducing new
>> `KStream#groupBy(Repartitioned)`
>>>>>>>>>>>> for specifying number of partitions for internal topic, we
>> should update
>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field.
>>>>>>>>>>>>
>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>> other hand
>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters
>> topology
>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka
>> Streams,
>>>>>>>>>>>> internally would optimize topology in the best way possible,
>> even if in
>>>>>>>>>>>> some cases this means ignoring some operator configurations
>> passed by the
>>>>>>>>>>>> user. Also, I agree with John about API design semantics. If we
>> go through
>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes more
>> sense to
>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead of
>> introducing
>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload.
>>>>>>>>>>>>
>>>>>>>>>>>> I would really appreciate communities feedback on this.
>>>>>>>>>>>>
>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>> Levani
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman <
>> sop...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and
>> don't have
>>>>>>>>>>>> much
>>>>>>>>>>>>> spare time at the
>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on
>> things, but
>>>>>>>>>>>> feel
>>>>>>>>>>>>> free to pick up something else
>>>>>>>>>>>>> to work on in the meantime until things have calmed down a bit!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze <
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like to get
>> some
>>>>>>>>>>>>>> attention on this PR:
>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> <
>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>>
>>>>>>>>>>>>>> It's been a while now and I would love to move on to other
>> KIPs as well.
>>>>>>>>>>>>>> Please let me know if you have any concerns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze <
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here’s voting thread for this KIP:
>>>>>>>>>>>>>>
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>
>> <
>>>>>>>>>>>>>>
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html
>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze <
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>
>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion on
>> that one.
>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a good
>> idea.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Updated KIP
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax <
>> matth...@confluent.io
>>>>>>>>>>>> <mailto:matth...@confluent.io>
>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>>
>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One question:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Why do we add
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int
>> numberOfPartitions)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems that `#with(String name)`,
>> `#numberOfPartitions(int)` in
>>>>>>>>>>>>>>>>> combination with `withName()` and
>> `withNumberOfPartitions()` should
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload
>> small if
>>>>>>>>>>>>>> possible.)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Otherwise LGTM.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks all for your feedback.
>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re any
>> other
>>>>>>>>>>>>>> concerns about this KIP, please let me know.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze <
>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense.
>>>>>>>>>>>>>>>>>>> I’ve updated KIP (
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax <
>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>
>> <mailto:
>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> <mailto:
>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>>
>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a
>> partitioning
>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and
>> producer
>>>>>>>>>>>>>> configs. My
>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an
>> "extension" to
>>>>>>>>>>>>>> `Produced`,
>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to think
>> about it and
>>>>>>>>>>>>>> helps
>>>>>>>>>>>>>>>>>>>> to keep the API "clean".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to avoid
>>>>>>>>>>>> abbreviations.
>>>>>>>>>>>>>> Can
>>>>>>>>>>>>>>>>>>>> we rename:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more `static`
>> methods:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>)
>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int)
>>>>>>>>>>>>>>>>>>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it makes
>> sense to
>>>>>>>>>>>> have
>>>>>>>>>>>>>> some duplicate configurations between operators in order to
>> keep API
>>>>>>>>>>>> simple
>>>>>>>>>>>>>> and usable.
>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to have
>> proper
>>>>>>>>>>>>>> backward compatibility.
>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs
>> separate was
>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some
>> producer level
>>>>>>>>>>>>>> configs makes API more readable.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman <
>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto:
>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto:
>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep
>> producer level
>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level
>> considerations
>>>>>>>>>>>>>> separate.
>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic
>> level
>>>>>>>>>>>>>> configuration. But
>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just as
>> much a topic
>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have two
>> producers
>>>>>>>>>>>>>> configured
>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if
>> they are
>>>>>>>>>>>>>> writing to the
>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to
>> part. So in a
>>>>>>>>>>>>>> sense, these
>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just
>> producers.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams API is
>> not always
>>>>>>>>>>>>>> true to
>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an
>> operator are
>>>>>>>>>>>>>> wrapped into a
>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We could
>> instead
>>>>>>>>>>>>>> split out the
>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate
>> parameter to keep
>>>>>>>>>>>>>> topic and
>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this
>> increases the
>>>>>>>>>>>>>> API surface
>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to just
>> say "this
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs"
>> without worrying
>>>>>>>>>>>>>> about what
>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a
>> field of
>>>>>>>>>>>>>> Repartitioned,
>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition
>> elsewhere in
>>>>>>>>>>>>>> Streams at
>>>>>>>>>>>>>>>>>>>>>> the moment
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze <
>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>> <mailto:
>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback.
>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with
>>>>>>>>>>>>>> `Repartitioned#partitioner`
>>>>>>>>>>>>>>>>>>>>>>> configuration.
>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class for
>> topic level
>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer level
>>>>>>>>>>>>>> configurations (such
>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in this
>> thread).
>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream
>> interface, I
>>>>>>>>>>>>>> couldn’t really
>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level
>> configuration
>>>>>>>>>>>>>> class and just
>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda breaking
>> the
>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which
>> encapsulates topic
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal topics is
>> viable
>>>>>>>>>>>>>> thing to do.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck <
>> bbej...@gmail.com
>>>>>>>>>>>> <mailto:bbej...@gmail.com>
>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>>
>> <mailto:
>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto:
>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option.  In
>> addition to
>>>>>>>>>>>>>> the reason
>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than
>> hash-based
>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something
>> different than the
>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key.  For example:
>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in
>> conjunction with
>>>>>>>>>>>>>> the key
>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key
>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on
>> Produced hence
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes sense
>> for API
>>>>>>>>>>>>>> consistency.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Just my  2 cents.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>> levani.co...@gmail.com>
>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>> <mailto:
>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense.
>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to
>> Repartitioned class,
>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of partitions for
>> internal
>>>>>>>>>>>>>> topics, user
>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure co-partitioning
>> before join
>>>>>>>>>>>>>> operation.
>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature.
>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler <
>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>
>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>
>> <mailto:
>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind. Again,
>> not totally
>>>>>>>>>>>>>> sure it
>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar is
>> the
>>>>>>>>>>>> rationale
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>> levani.co...@gmail.com>
>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your example
>> I would
>>>>>>>>>>>>>> first issue
>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper partitioner
>> that
>>>>>>>>>>>>>> essentially
>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with and
>> then do the
>>>>>>>>>>>>>>>>>>>>>>> KStream#join
>>>>>>>>>>>>>>>>>>>>>>>>> with DSL?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler <
>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>> j...@confluent.io
>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io <mailto:
>>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an option to
>> specify
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is that
>> some data may
>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input
>> topic. If the
>>>>>>>>>>>>>> right-side
>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning
>> strategy, then the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be
>> partitioned with the
>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss
>> something
>>>>>>>>>>>>>> important?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani
>> Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>> levani.co...@gmail.com>
>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To be
>> honest I’m
>>>>>>>>>>>> not
>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>> it yet.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really
>> think I would
>>>>>>>>>>>>>> need control
>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka Streams
>> knows best
>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should be
>> used only for
>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have
>> possibility to
>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet, but I
>> guess
>>>>>>>>>>>>>> specifying the
>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s
>> agreement on
>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie
>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>> <mailto:
>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that
>> Repartitioned
>>>>>>>>>>>>>> would be a
>>>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also need
>> to have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field, similar
>> to
>>>>>>>>>>>>>> Produced? I'm not
>>>>>>>>>>>>>>>>>>>>>>>>> sure how
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but seems
>> it should
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>> levani.co...@gmail.com>
>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and
>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by Kafka
>> Streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user more
>> control over
>>>>>>>>>>>>>> the topic
>>>>>>>>>>>>>>>>>>>>>>>>> such as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is
>> something that
>>>>>>>>>>>> is
>>>>>>>>>>>>>> missing
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over
>> parallelism by
>>>>>>>>>>>>>> configuring
>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie
>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>> <mailto:
>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one
>> thing for me
>>>>>>>>>>>> --
>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a
>> Repartitioned,
>>>>>>>>>>>>>> will the
>>>>>>>>>>>>>>>>>>>>>>>>> topic be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems to be
>> the case
>>>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> signature
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it have to
>> be
>>>>>>>>>>>>>> pre-created? The
>>>>>>>>>>>>>>>>>>>>>>>>> wording
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version of
>> the method
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>> auto-create
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>> levani.co...@gmail.com>
>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>> <mailto:
>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>> <
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing list :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities
>> opinions/concerns
>>>>>>>>>>>> about
>>>>>>>>>>>>>> this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward, I’ve
>> updated
>>>>>>>>>>>>>> confluence
>>>>>>>>>>>>>>>>>>>>>>> page
>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected Alternatives”
>> section.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing
>> dedicated `Topic`
>>>>>>>>>>>>>> class for
>>>>>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators like
>>>>>>>>>>>> `groupedBy`.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others opinion
>> about this
>>>>>>>>>>>> as
>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias
>> J. Sax <
>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP! And
>> thanks for
>>>>>>>>>>>>>> summarizing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been
>> discussed
>>>>>>>>>>>>>> already (can't
>>>>>>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a good
>> summary to
>>>>>>>>>>>>>> refresh the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes sense.
>> With regard
>>>>>>>>>>>> to
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> distinction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage topics
>> and
>>>>>>>>>>>> operators
>>>>>>>>>>>>>> that use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument, it
>> might
>>>>>>>>>>>> indicate
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator that
>> uses
>>>>>>>>>>>>>> use-defined
>>>>>>>>>>>>>>>>>>>>>>>>> topics) and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()`
>> operator (an
>>>>>>>>>>>>>> operator that
>>>>>>>>>>>>>>>>>>>>>>>>> manages
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good.
>> Otherwise, there is
>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages
>> topics but
>>>>>>>>>>>>>> sometimes
>>>>>>>>>>>>>>>>>>>>>>> not; a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make the
>> distinction
>>>>>>>>>>>>>> clearer.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to
>> `Grouped`. I am
>>>>>>>>>>>>>> wondering
>>>>>>>>>>>>>>>>>>>>>>>>> if the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does apply
>> and adding
>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>> semantically
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get
>> opinions of
>>>>>>>>>>>>>> others on
>>>>>>>>>>>>>>>>>>>>>>>>> this, too.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I prefer
>> atm.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration objects
>> that allow
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer, producer,
>> store. If we
>>>>>>>>>>>>>> assume that
>>>>>>>>>>>>>>>>>>>>>>>>> a topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if we
>> should have a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class
>> and method
>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>>>>> a plain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to add
>> other
>>>>>>>>>>>> configs,
>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> replication
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc, easily,
>> without the
>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>> change the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not sure
>> if I like
>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>>>>> myself.
>>>>>>>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze
>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though -
>> maybe
>>>>>>>>>>>> enhancing
>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>> with num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the
>> best approach.
>>>>>>>>>>>>>> Let me
>>>>>>>>>>>>>>>>>>>>>>>>> explain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class with
>> this
>>>>>>>>>>>>>> configuration,
>>>>>>>>>>>>>>>>>>>>>>>>> this will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation. Since
>> KStream#to is
>>>>>>>>>>>>>> the final
>>>>>>>>>>>>>>>>>>>>>>>>> sink of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be reasonable
>>>>>>>>>>>> assumption
>>>>>>>>>>>>>> that user
>>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in advance. And
>> in that
>>>>>>>>>>>>>> case, having
>>>>>>>>>>>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make much
>> sense.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class, based
>> on API
>>>>>>>>>>>>>> contract, seems
>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something that
>> is
>>>>>>>>>>>>>> explicitly for
>>>>>>>>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer, partitioner
>> those all
>>>>>>>>>>>>>> are producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions is
>> topic level
>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer level
>>>>>>>>>>>>>> configurations
>>>>>>>>>>>>>>>>>>>>>>>>> together in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface,
>> seems like
>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>> parameter is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with non-internal
>> (e.g
>>>>>>>>>>>> topics
>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics and I
>> think we
>>>>>>>>>>>>>> should leave
>>>>>>>>>>>>>>>>>>>>>>>>> it as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into account,
>> I think we
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> distinguish
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka
>> Streams should
>>>>>>>>>>>>>> create and
>>>>>>>>>>>>>>>>>>>>>>>>> manage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs
>> topics that
>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>> created in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding
>> numPartitions
>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and producer
>> level
>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API
>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name optional
>> in
>>>>>>>>>>>>>> KStream#through - I
>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and giving
>> users
>>>>>>>>>>>>>> possibility to
>>>>>>>>>>>>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more useful :)
>>>>>>>>>>>> Considering
>>>>>>>>>>>>>> arguments
>>>>>>>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced
>> class, I see two
>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>> auto-generated and
>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions)
>> - topic
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions,
>> final
>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated
>> with
>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it is and
>> introduce
>>>>>>>>>>>>>> new method
>>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias
>> suggested this
>>>>>>>>>>>>>> in one of
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I
>> propose the
>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>>>> plan:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>> configuration to
>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method overloads to
>>>>>>>>>>>>>> KStream#through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>> auto-generated and
>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions)
>> - topic
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions,
>> final
>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated
>> with
>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>> configuration to
>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator
>> KStream#repartition for
>>>>>>>>>>>>>> creating and
>>>>>>>>>>>>>>>>>>>>>>>>> managing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was
>> already
>>>>>>>>>>>>>> discussed in the
>>>>>>>>>>>>>>>>>>>>>>>>> mailing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the threads
>> that were
>>>>>>>>>>>>>> about this
>>>>>>>>>>>>>>>>>>>>>>>>> KIP :(
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani
>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>>>>>>> levani.co...@gmail.com>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect discussion
>> around
>>>>>>>>>>>>>> KIP-221. Going
>>>>>>>>>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems to
>> agreement
>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation, as far
>> as I
>>>>>>>>>>>>>> understood, the
>>>>>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the
>> following:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to
>> KStream#through
>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>>>>> (essentially
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with
>> numOfPartitions
>>>>>>>>>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL
>> users to
>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing stateful
>>>>>>>>>>>> operations.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface
>> changes around
>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to