Levani,

do you agree to the current proposal? It's basically a de-scoping of the
already voted KIP. If you agree, could you update the KIP wiki page
accordingly, including the "Rejected Alternative" section (and mabye a
link to a follow up Jira ticket).

Because it's a descope, and John and myself support it, there seems to
be no need to re-vote.

@Sophie,John: thanks a lot for your thoughtful input!


-Matthias

On 11/15/19 12:47 PM, John Roesler wrote:
> Thanks Sophie,
> 
> I think your concern is valid, and also that your idea to make a
> ticket is a good idea.
> 
> Creating a ticket has some very positive effects:
> * It allows us to record the thinking at this point in time so we
> don't have to dig through the mail archives later
> * It demonstrates that we did consider the use case, and do want to
> address it, but just don't feel confident to implement it right now.
> Then, if/when people do have a problem with the gap, the ticket it
> already there for them to consider, request, or even pick up.
> 
> Since one aspect of the deferral is a desire to wait for real use
> experience, we should explicitly mention that in the ticket. This is
> just good information for people browsing the Jira looking for
> interesting tickets to pick up. They could still pick it up, but they
> can ask themselves if they really understand the real-world use cases
> any better than we do right now.
> 
> Thanks, likewise, to you for the good discussion!
> -John
> 
> On Fri, Nov 15, 2019 at 2:37 PM Sophie Blee-Goldman <sop...@confluent.io> 
> wrote:
>>
>> While I'm concerned that "not augmenting groupBy as part of this KIP"
>> really translates to "will not get around to augmenting groupBy for a long
>> time if not as part of this KIP", like I said I don't want to hold up the
>> new
>> .repartition operator that it seems we do, at least, all agree on. It's a
>> fair
>> point that we can always add this in later, but undoing it is far more
>> problematic.
>>
>> Anyways, I would be happy if we at least make a ticket to consider adding a
>> "number of partitions" option/suggestion to groupBy, so that we don't lose
>> all the thought put in to this decision so far and can avoid rehashing the
>> same
>> argument word for word and have something to point to when someone
>> asks "why didn't we add this numPartitions option to groupBy".
>>
>> Beyond that, if the community isn't pushing for it at this moment then it
>> seems very
>> reasonable to shelve the idea for now so that the rest of this KIP can
>> proceed.
>> Without input one way or another it's hard to say what the right thing to
>> do is,
>> which makes the right thing to do "wait to add this feature"
>>
>> Thanks for the good discussion everyone,
>>
>> Sophie
>>
>> On Fri, Nov 15, 2019 at 12:41 PM John Roesler <j...@confluent.io> wrote:
>>
>>> Hi all,
>>>
>>> I think that Sophie is asking a good question, and I do think that
>>> such "blanket configurations" are plausible. For example, we currently
>>> support (and I would encourage) "I don't know if this is going to
>>> create a repartition topic, but if it does, then use this name instead
>>> of generating one".
>>>
>>> I'm not sure I'm convinced that specifying max parallelism falls into
>>> this category. After all, the groupByKey+aggregate will be executed
>>> with _some_ max parallelism. It's either the same as the inputs'
>>> partition count or overridden with the proposed config. It seems
>>> counterintuitive to override the specified option with the default
>>> value.
>>>
>>> I'm not sure if I can put my finger on it, but "maybe use this name"
>>> seems way more reasonable to me than "maybe execute with this degree
>>> of parallelism".
>>>
>>> I do think (and I appreciate that this is where Sophie's example is
>>> coming from) that Streams should strive to be absolutely as simple and
>>> intuitive as possible (while still maintaining correctness). Optimal
>>> performance can be at odds with API simplicity. For example, the
>>> simplest behavior is, if you ask for 5 partitions, you get 5
>>> partitions. Maybe a repartition is technically not necessary (if you
>>> didn't change the key), but at least there's no mystery to this
>>> behavior.
>>>
>>> Clearly, an (opposing) tenent of simplicity is trying to prevent
>>> people from making mistakes, which I think is what the example boils
>>> down to. Sometimes, we can prevent clear mistakes, like equi-joining
>>> two topics with different partition counts. But for this case, it
>>> doesn't seem as clear-cut to be able to assume that they _said_ 5
>>> partitions, but they didn't really _want_ 5 partitions. Maybe we can
>>> just try to be clear in the documentation, and also even log a warning
>>> when we parse the topology, "hey, I've been asked to repartition this
>>> stream, but it's not necessary".
>>>
>>> If anything, this discussion really supports to me the value in just
>>> sticking with `repartition()` for now, and deferring
>>> `groupBy[Key](partitions)` to the future.
>>>
>>>> Users should not have to choose between allowing Streams to optimize the
>>> repartition placement, and allowing to specify a number of partitions.
>>>
>>> This is a very fair point, and it may be something that we rapidly
>>> return to, but it seems safe for now to introduce the non-optimizable
>>> `reparition()` only, and then consider optimization options later.
>>> Skipping available optimizations will never break correctness, but
>>> adding optimizations can, so it makes sense to treat them with
>>> caution.
>>>
>>> In conclusion, I do think that a use _could_ want to "maybe specify"
>>> the partition count, but I also think we can afford to pass on
>>> supporting this right now.
>>>
>>> I'm open to continuing the discussion, but just to avoid ambiguity, I
>>> still feel we should _not_ change the groupBy[Key] operation at all,
>>> and we should only add `repartition()` as a non-optimizable operation.
>>>
>>> Thanks all,
>>> -John
>>>
>>> On Fri, Nov 15, 2019 at 11:26 AM Levani Kokhreidze
>>> <levani.co...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> Just fyi, PR was updated and now it incorporates the latest suggestions
>>> about joins.
>>>> `CopartitionedTopicsEnforcer` will throw an exception if number of
>>> partitions aren’t the same when using `repartition` operation along with
>>> `join`.
>>>>
>>>> For more details please take a look at the PR:
>>> https://github.com/apache/kafka/pull/7170/files <
>>> https://github.com/apache/kafka/pull/7170/files>
>>>>
>>>> Regards,
>>>> Levani
>>>>
>>>>
>>>>> On Nov 15, 2019, at 11:01 AM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>>>
>>>>> Thanks a lot for the input Sophie.
>>>>>
>>>>> Your example is quite useful, and I would use it to support my claim
>>>>> that a "partition hint" for `Grouped` seems "useless" and does not
>>>>> improve the user experience.
>>>>>
>>>>> 1) You argue that a new user would be worries about repartitions topics
>>>>> with too many paritions. This would imply that a user is already
>>>>> advanced enough to understand the implication of repartitioning -- for
>>>>> this case, I would argue that a user also understand _when_ a
>>>>> auto-repartitioning would happen and thus the users understands where
>>> to
>>>>> insert a `repartition()` operation.
>>>>>
>>>>> 2) For specifying Serdes: if a `groupByKey()` does not trigger
>>>>> auto-repartitioning it's not required to specify the serdes and if they
>>>>> are specified they would be ignored/unused (note, that `groupBy()`
>>> would
>>>>> always trigger a repartitioning). Of course, if the default Serdes from
>>>>> the config match (eg, all data types are Json anyway), a user does not
>>>>> need to worry about specifying serdes. -- For new user that play
>>> around,
>>>>> I would assume that they work a lot with primitive types and thus would
>>>>> need to specify the serdes -- hence, they would learn about
>>>>> auto-repartitioning the hard way anyhow, because each time a
>>>>> `groupByKey()` does trigger auto-repartioning, they would need to pass
>>>>> in the correct Serdes -- this way, they would also be educated where to
>>>>> insert a `repartition()` operator if needed.
>>>>>
>>>>> 3) If a new user really just "plays around", I don't think they use an
>>>>> input topic with 100 partitions but most likely have a local single
>>> node
>>>>> broker with most likely single partitions topics.
>>>>>
>>>>>
>>>>> My main argument for my current proposal is however, that---based on
>>>>> past experience---it's better to roll out a new feature more carefully
>>>>> and see how it goes. Last, as John pointed out, we can still extend the
>>>>> feature in the future. Instead of making a judgment call up-front,
>>> being
>>>>> more conservative and less fancy, and revisit the design based on
>>>>> actuall user feedback after the first version is rolled out, seems to
>>> be
>>>>> the better option. Undoing a feature is must harder than extending it.
>>>>>
>>>>>
>>>>> While I advocate strong for a simple first version of this feature,
>>> it's
>>>>> a community decission in the end, and I would not block this KIP if
>>>>> there is a broad preference to add `Grouped#withNumberOfPartitions()`
>>>>> either.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote:
>>>>>> It seems like we all agree at this point (please correct me if
>>> wrong!) that
>>>>>> we should NOT change
>>>>>> the existing repartitioning behavior, ie we should allow Streams to
>>>>>> continue to determine when and
>>>>>> where to repartition -- *unless* explicitly informed to by the use of
>>> a
>>>>>> .through or the new .repartition operator.
>>>>>>
>>>>>> Regarding groupBy, the existing behavior we should not disrupt is
>>>>>> a) repartition *only* when required due to upstream key-changing
>>> operation
>>>>>> (ie don't force repartitioning
>>>>>> based on the presence of an optional config parameter), and
>>>>>> b) allow optimization of required repartitions, if any
>>>>>>
>>>>>> Within the constraint of not breaking the existing behavior, this
>>> still
>>>>>> leaves open the question of whether we
>>>>>> want to improve the user experience by allowing to provide groupBy
>>> with a
>>>>>> *suggestion* for numPartitions (or to
>>>>>> put it more fairly, whether that *will* improve the experience). I
>>> agree
>>>>>> with many of the arguments outlined above but
>>>>>> let me just push back on this one issue one final time, and if we
>>> can't
>>>>>> come to a consensus then I am happy to drop
>>>>>> it for now so that the KIP can proceed.
>>>>>>
>>>>>> Specifically, my proposal would be to simply augment Grouped with an
>>>>>> optional numPartitions, understood to
>>>>>> indicate the user's desired number of partitions *if Streams decides
>>> to
>>>>>> repartition due to that groupBy*
>>>>>>
>>>>>>> if a user cares about the number of partition, the user wants to
>>> enforce
>>>>>> a repartitioning
>>>>>> First, I think we should take a step back and examine this claim. I
>>> agree
>>>>>> 100% that *if this is true,*
>>>>>> *then we should not give groupBy an optional numPartitions.* As far
>>> as I
>>>>>> see it, there's no argument
>>>>>> to be had there if we *presuppose that claim.* But I'm not convinced
>>> in
>>>>>> that as an axiom of the user
>>>>>> experience and think we should be examining that claim itself, not the
>>>>>> consequences of it.
>>>>>>
>>>>>> To give a simple example, let's say some new user is trying out
>>> Streams and
>>>>>> wants to just play around
>>>>>> with it to see if it might be worth looking into. They want to just
>>> write
>>>>>> up a simple app and test it out on the
>>>>>> data in some existing topics they have with a large number of
>>> partitions,
>>>>>> and a lot of data. They're just messing
>>>>>> around, trying new topologies and don't want to go through each new
>>> one
>>>>>> step by step to determine if (or where)
>>>>>> a repartition might be required. They also don't want to force a
>>>>>> repartition if it turns out to not be required, so they'd
>>>>>> like to avoid the nice new .repartition operator they saw. But given
>>> the
>>>>>> huge number of input partitions, they'd like
>>>>>> to rest assured that if a repartition does end up being required
>>> somewhere
>>>>>> during dev, it will not be created with
>>>>>> the same huge number of partitions that their input topic has -- so
>>> they
>>>>>> just pass groupBy a small numPartitions
>>>>>> suggestion.
>>>>>>
>>>>>> I know that's a bit of a contrived example but I think it does
>>> highlight
>>>>>> how and when this might be a considerable
>>>>>> quality of life improvement, in particular for new users to Streams
>>> and/or
>>>>>> during the dev cycle. *You don't want to*
>>>>>> *force a repartition if it wasn't necessary, but you don't want to
>>> create a
>>>>>> topic with a huge partition count either.*
>>>>>>
>>>>>> Also, while the optimization discussion took us down an interesting
>>> but
>>>>>> ultimately more distracting road, it's worth
>>>>>> pointing out that it is clearly a major win to have as few
>>>>>> repartition topics/steps as possible. Given that we
>>>>>> don't want to change existing behavior, the optimization framework
>>> can only
>>>>>> help out when the placement of
>>>>>> repartition steps is flexible, which means only those from .groupBy
>>> (and
>>>>>> not .repartition). *Users should not*
>>>>>> *have to choose between allowing Streams to optimize the repartition
>>>>>> placement, and allowing to specify a *
>>>>>> *number of partitions.*
>>>>>>
>>>>>> Lastly, I have what may be a stupid question but for my own
>>> edification of
>>>>>> how groupBy works:
>>>>>> if you do a .groupBy and a repartition is NOT required, does it ever
>>> need
>>>>>> to serialize/deserialize
>>>>>> any of the data? In other words, if you pass a key/value serde to
>>> groupBy
>>>>>> and it doesn't trigger
>>>>>> a repartition, is the serde(s) just ignored and thus more like a
>>> suggestion
>>>>>> than a requirement?
>>>>>>
>>>>>> So again, I don't want to hold up this KIP forever but I feel we've
>>> spent
>>>>>> some time getting slightly
>>>>>> off track (although certainly into very interesting discussions) yet
>>> never
>>>>>> really addressed or questioned
>>>>>> the basic premise: *could a user want to specify a number of
>>> partitions but
>>>>>> not enforce a repartition (at that*
>>>>>> *specific point in the topology)?*
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax <
>>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Side remark:
>>>>>>>
>>>>>>> If the user specifies `repartition()` on both side of the join, we
>>> can
>>>>>>> actually throw the execption earlier, ie, when we build the topology.
>>>>>>>
>>>>>>> Current, we can do this check only after Kafka Streams was started,
>>>>>>> within `StreamPartitionAssignor#assign()` -- we still need to keep
>>> this
>>>>>>> check for the case that none or only one side has a user specified
>>>>>>> number of partitions though.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 11/14/19 8:15 AM, John Roesler wrote:
>>>>>>>> Thanks, all,
>>>>>>>>
>>>>>>>> I can get behind just totally leaving out reparation-via-groupBy. If
>>>>>>>> we only introduce `repartition()` for now, we're making the minimal
>>>>>>>> change to gain the desired capability.
>>>>>>>>
>>>>>>>> Plus, since we agree that `repartition()` should never be
>>> optimizable,
>>>>>>>> it's a future-compatible proposal. I.e., if we were to add a
>>>>>>>> non-optimizable groupBy(partitions) operation now, and want to make
>>> it
>>>>>>>> optimizable in the future, we have to worry about topology
>>>>>>>> compatibility. Better to just do non-optimizable `repartition()`
>>> now,
>>>>>>>> and add an optimizable `groupBy(partitions)` in the future (maybe).
>>>>>>>>
>>>>>>>> About joins, yes, it's a concern, and IMO we should just do the same
>>>>>>>> thing we do now... check at runtime that the partition counts on
>>> both
>>>>>>>> sides match and throw an exception otherwise. What this means as a
>>>>>>>> user is that if you explicitly repartition the left side to 100
>>>>>>>> partitions, and then join with the right side at 10 partitions, you
>>>>>>>> get an exception, since this operation is not possible. You'd either
>>>>>>>> have to "step down" the left side again, back to 10 partitions, or
>>> you
>>>>>>>> could repartition the right side to 100 partitions before the join.
>>>>>>>> The choice has to be the user's, since it depends on their desired
>>>>>>>> execution parallelism.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax <
>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks a lot John. I think the way you decompose the operators is
>>> super
>>>>>>>>> helpful for this discussion.
>>>>>>>>>
>>>>>>>>> What you suggest with regard to using `Grouped` and enforcing
>>>>>>>>> repartitioning if the number of partitions is specified is
>>> certainly
>>>>>>>>> possible. However, I am not sure if we _should_ do this. My
>>> reasoning is
>>>>>>>>> that an enforce repartitioning as introduced via `repartition()`
>>> is an
>>>>>>>>> expensive operations, and it seems better to demand an more
>>> explicit
>>>>>>>>> user opt-in to trigger it. Just setting an optional parameter
>>> might be
>>>>>>>>> too subtle to trigger such a heavy "side effect".
>>>>>>>>>
>>>>>>>>> While I agree about "usability" in general, I would prefer a more
>>>>>>>>> conservative appraoch to introduce this feature, see how it goes,
>>> and
>>>>>>>>> maybe make it more advance later on. This also applies to what
>>>>>>>>> optimzation we may or may not allow (or are able to perform at
>>> all).
>>>>>>>>>
>>>>>>>>> @Levani: Reflecting about my suggestion about `Repartioned extends
>>>>>>>>> Grouped`, I agree that it might not be a good idea.
>>>>>>>>>
>>>>>>>>> Atm, I see an enforces repartitioning as non-optimizable and as a
>>> good
>>>>>>>>> first step and I would suggest to not intoruce anything else for
>>> now.
>>>>>>>>> Introducing optimizable enforce repartitioning via `groupBy(...,
>>>>>>>>> Grouped)` is something we could add later.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Therefore, I would not change `Grouped` but only introduce
>>>>>>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in
>>> to
>>>>>>>>> set the number of partitions, would need to rewrite their code to
>>>>>>>>> `selectKey(...).repartition(...).groupByKey()`. It's less
>>> convinient but
>>>>>>>>> also less risky from an API and optimization point of view.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @Levani: about joins -> yes, we will need to check the specified
>>> number
>>>>>>>>> of partitions (if any) and if they don't match, throw an
>>> exception. We
>>>>>>>>> can discuss this on the PR -- I am just trying to get the PR for
>>> KIP-466
>>>>>>>>> merged -- your is next on the list :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thoughts?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
>>>>>>>>>> Thank you all for an interesting discussion. This is very
>>> enlightening.
>>>>>>>>>>
>>>>>>>>>> Thank you Matthias for your explanation. Your arguments are very
>>> true.
>>>>>>> It makes sense that if user specifies number of partitions he/she
>>> really
>>>>>>> cares that those specifications are applied to internal topics.
>>>>>>>>>> Unfortunately, in current implementation this is not true during
>>>>>>> `join` operation. As I’ve written in the PR comment, currently, when
>>>>>>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max
>>> number of
>>>>>>> partitions from the two source topics.
>>>>>>>>>> I’m not really sure what would be the other way around this
>>> situation.
>>>>>>> Maybe fail the stream altogether and inform the user to specify same
>>> number
>>>>>>> of partitions?
>>>>>>>>>> Or we should treat join operations in a same way as it is right
>>> now
>>>>>>> and basically choose max number of partitions even when `repartition`
>>>>>>> operation is specified, because Kafka Streams “knows the best” how to
>>>>>>> handle joins?
>>>>>>>>>> You can check integration tests how it’s being handled currently.
>>> Open
>>>>>>> to suggestions on that part.
>>>>>>>>>>
>>>>>>>>>> As for groupBy, I agree and John raised very interesting points.
>>> My
>>>>>>> arguments for allowing users to specify number of partitions during
>>> groupBy
>>>>>>> operations mainly was coming from the usability perspective.
>>>>>>>>>> So building on top of what John said, maybe it makes sense to make
>>>>>>> `groupBy` operations smarter and whenever user specifies
>>>>>>> `numberOfPartitions` configuration, repartitioning will be enforced,
>>> wdyt?
>>>>>>>>>> I’m not going into optimization part yet :) I think it will be
>>> part of
>>>>>>> separate PR and task, but overall it makes sense to apply
>>> optimizations
>>>>>>> where number of partitions are the same.
>>>>>>>>>>
>>>>>>>>>> As for Repartitioned extending Grouped, I kinda feel that it
>>> won’t fit
>>>>>>> nicely in current API design.
>>>>>>>>>> In addition, in the PR review, John mentioned that there were a
>>> lot of
>>>>>>> troubles in the past trying to use one operation's configuration
>>> objects on
>>>>>>> other operations.
>>>>>>>>>> Also it makes sense to keep them separate in terms of
>>> compatibility.
>>>>>>>>>> In that case, we don’t have to worry every time Grouped is
>>> changed,
>>>>>>> what would be the implications on `repartition` operations.
>>>>>>>>>>
>>>>>>>>>> Kind regards,
>>>>>>>>>> Levani
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io>
>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Ah, thanks for the clarification. I missed your point.
>>>>>>>>>>>
>>>>>>>>>>> I like the framework you've presented. It does seem simpler to
>>> assume
>>>>>>>>>>> that they either care about the partition count and want to
>>>>>>>>>>> repartition to realize it, or they don't care about the number.
>>>>>>>>>>> Returning to this discussion, it does seem unlikely that they
>>> care
>>>>>>>>>>> about the number and _don't_ care if it actually gets realized.
>>>>>>>>>>>
>>>>>>>>>>> But then, it still seems like we can just keep the option as
>>> part of
>>>>>>>>>>> Grouped. As in:
>>>>>>>>>>>
>>>>>>>>>>> // user does not care
>>>>>>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/)
>>>>>>>>>>> stream.groupBy(Grouped /*not specifying partition count*/)
>>>>>>>>>>>
>>>>>>>>>>> // user does care
>>>>>>>>>>> stream.repartition(Repartitioned)
>>>>>>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...))
>>>>>>>>>>> stream.groupBy(Grouped.numberOfPartitions(...))
>>>>>>>>>>>
>>>>>>>>>>> ----
>>>>>>>>>>>
>>>>>>>>>>> The above discussion got me thinking about algebra. Matthias is
>>>>>>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent
>>> to
>>>>>>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not
>>> convinced that
>>>>>>>>>>> we should force people to apply that expansion themselves vs.
>>> having a
>>>>>>>>>>> more compact way to express it if they don't care where exactly
>>> the
>>>>>>>>>>> repartition occurs. However, thinking about these operators
>>>>>>>>>>> algebraically can really help *us* narrow down the number of
>>> different
>>>>>>>>>>> expressions we have to consider.
>>>>>>>>>>>
>>>>>>>>>>> Let's consider some identities:
>>>>>>>>>>>
>>>>>>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg
>>>>>>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg
>>>>>>>>>>> C: mapKey(mapper) + ... + groupByKey + agg
>>>>>>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg
>>>>>>>>>>> D: repartition = sink(managed) + src
>>>>>>>>>>>
>>>>>>>>>>> In these identities, I used one special identifier (...), which
>>> means
>>>>>>>>>>> any number (0+) of operations that are not src, mapKey,
>>> groupBy[Key],
>>>>>>>>>>> repartition, or agg.
>>>>>>>>>>>
>>>>>>>>>>> For mental clarity, I'm just going to make up a rule that groupBy
>>>>>>>>>>> operations are not executable. In other words, you have to get
>>> to a
>>>>>>>>>>> point where you can apply B to convert a groupByKey into a
>>> passthough
>>>>>>>>>>> in order to execute the program. This is just a formal way of
>>> stating
>>>>>>>>>>> what already happens in Kafka Streams.
>>>>>>>>>>>
>>>>>>>>>>> By applying A, we can just completely leave `groupBy` out of our
>>>>>>>>>>> analysis. It trivially decomposes into a mapKey followed by a
>>>>>>>>>>> groupByKey.
>>>>>>>>>>>
>>>>>>>>>>> Then, we can eliminate the "repartition required" case of
>>> `groupByKey`
>>>>>>>>>>> by applying C followed by D to get to the "no repartition
>>> required"
>>>>>>>>>>> version of groupByKey, which in turn sets us up to apply B to
>>> get an
>>>>>>>>>>> executable topology.
>>>>>>>>>>>
>>>>>>>>>>> Fundamentally, you can think about KIP-221 is as proposing a
>>> modified
>>>>>>>>>>> D identity in which you can specify the partition count of the
>>> managed
>>>>>>>>>>> sink topic:
>>>>>>>>>>> D': repartition(pc) = sink(managed w/ pc) + src
>>>>>>>>>>>
>>>>>>>>>>> Since users _could_ apply the identities above, we don't
>>> actually have
>>>>>>>>>>> to add any partition count to groupBy[Key], but we decided early
>>> on in
>>>>>>>>>>> the KIP discussion that it's more ergonomic to add it. In that
>>> case,
>>>>>>>>>>> we also have to modify A and C:
>>>>>>>>>>> A': groupBy(mapper, pc) + agg
>>>>>>>>>>> = mapKey(mapper) + groupByKey(pc) + agg
>>>>>>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg
>>>>>>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg
>>>>>>>>>>>
>>>>>>>>>>> Which sets us up still to always be able to get back to a plain
>>>>>>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and
>>>>>>>>>>> ultimately B to get an executable topology.
>>>>>>>>>>>
>>>>>>>>>>> What about the optimizer?
>>>>>>>>>>> The optimizer applies another set of graph-algebraic identities
>>> to
>>>>>>>>>>> minimize the number of repartition topics in a topology.
>>>>>>>>>>>
>>>>>>>>>>> (forgive my ascii art)
>>>>>>>>>>>
>>>>>>>>>>> E: (merging repartition nodes)
>>>>>>>>>>> (...) -> repartition -> X
>>>>>>>>>>> \-> repartition -> Y
>>>>>>>>>>> =
>>>>>>>>>>> (... + repartition) -> X
>>>>>>>>>>>    \-> Y
>>>>>>>>>>> F: (reordering around repartition)
>>>>>>>>>>> Where SVO is any non-key-changing, stateless, operation:
>>>>>>>>>>> repartition -> SVO = SVO -> repartition
>>>>>>>>>>>
>>>>>>>>>>> In terms of these identities, what the optimizer does is apply F
>>>>>>>>>>> repeatedly in either direction to a topology to factor out
>>> common in
>>>>>>>>>>> branches so that it can apply E to merge repartition nodes. This
>>> was
>>>>>>>>>>> especially necessary before KIP-221 because you couldn't directly
>>>>>>>>>>> express `repartition` in the DSL, only indirectly via
>>> `groupBy[Key]`,
>>>>>>>>>>> so there was no way to do the factoring manually.
>>>>>>>>>>>
>>>>>>>>>>> We can now state very clearly that in KIP-221, explicit
>>>>>>>>>>> `repartition()` operators should create a "reordering barrier".
>>> So, F
>>>>>>>>>>> cannot be applied to an explicit `repartition()`. Also, I think
>>> we
>>>>>>>>>>> decided earlier that explicit `repartition()` operations would
>>> also be
>>>>>>>>>>> ineligible for merging, so E can't be applied to explicit
>>>>>>>>>>> `repartition()` operations either. I think we feel we _could_
>>> apply E
>>>>>>>>>>> without harm, but we want to be conservative for now.
>>>>>>>>>>>
>>>>>>>>>>> I think the salient point from the latter discussion has been
>>> that
>>>>>>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_
>>> constitute
>>>>>>>>>>> an explicit `repartition()` operator, and therefore, the
>>> resulting
>>>>>>>>>>> repartition node remains eligible for optimization.
>>>>>>>>>>>
>>>>>>>>>>> To be clear, I agree with Matthias that the provided partition
>>> count
>>>>>>>>>>> _must_ be used in the resulting implicit repartition. This has
>>> some
>>>>>>>>>>> implications for E. Namely, E could only be applied to two
>>> repartition
>>>>>>>>>>> nodes that have the same partition count. This has always been
>>>>>>>>>>> trivially true before KIP-221 because the partition count has
>>> always
>>>>>>>>>>> been "unspecified", i.e., it would be determined at runtime by
>>> the
>>>>>>>>>>> user-managed-topics' partition counts. Now, it could be
>>> specified or
>>>>>>>>>>> unspecified. We can simply augment E to allow merging only
>>> repartition
>>>>>>>>>>> nodes where the partition count is EITHER "specified and the
>>> same on
>>>>>>>>>>> both sides", OR "unspecified on both sides".
>>>>>>>>>>>
>>>>>>>>>>> Sorry for the long email, but I have a hope that it builds a
>>> solid
>>>>>>>>>>> theoretical foundation for our decisions in KIP-221, so we can
>>> have
>>>>>>>>>>> confidence that there are no edge cases for design flaws to hide.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax <
>>>>>>> matth...@confluent.io> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> it seems like we do want to allow
>>>>>>>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>>>>>>>> operation, but we don't want that option to _force_
>>> repartitioning
>>>>>>>>>>>>
>>>>>>>>>>>> Correct, ie, that is my suggestions.
>>>>>>>>>>>>
>>>>>>>>>>>>> "Use P partitions if repartitioning is necessary"
>>>>>>>>>>>>
>>>>>>>>>>>> I disagree here, because my reasoning is that:
>>>>>>>>>>>>
>>>>>>>>>>>> - if a user cares about the number of partition, the user wants
>>> to
>>>>>>>>>>>> enforce a repartitioning
>>>>>>>>>>>> - if a user does not case about the number of partitions, we
>>> don't
>>>>>>> need
>>>>>>>>>>>> to provide them a way to pass in a "hint"
>>>>>>>>>>>>
>>>>>>>>>>>> Hence, it should be sufficient to support:
>>>>>>>>>>>>
>>>>>>>>>>>> // user does not care
>>>>>>>>>>>>
>>>>>>>>>>>> `stream.groupByKey(Grouped)`
>>>>>>>>>>>> `stream.grouBy(..., Grouped)`
>>>>>>>>>>>>
>>>>>>>>>>>> // user does care
>>>>>>>>>>>>
>>>>>>>>>>>> `stream.repartition(Repartitioned).groupByKey()`
>>>>>>>>>>>> `streams.groupBy(..., Repartitioned)`
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote:
>>>>>>>>>>>>> Thanks for those thoughts, Matthias,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I find your reasoning about the optimization behavior
>>> compelling.
>>>>>>> The
>>>>>>>>>>>>> `through` operation is very simple and clear to reason about.
>>> It
>>>>>>> just
>>>>>>>>>>>>> passes the data exactly at the specified point in the topology
>>>>>>> exactly
>>>>>>>>>>>>> through the specified topic. Likewise, if a user invokes a
>>>>>>>>>>>>> `repartition` operator, the simplest behavior is if we just do
>>> what
>>>>>>>>>>>>> they asked for.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Stepping back to think about when optimizations are surprising
>>> and
>>>>>>>>>>>>> when they aren't, it occurs to me that we should be free to
>>> move
>>>>>>>>>>>>> around repartitions when users have asked to perform some
>>> operation
>>>>>>>>>>>>> that implies a repartition, like "change keys, then filter,
>>> then
>>>>>>>>>>>>> aggregate". This program requires a repartition, but it could
>>> be
>>>>>>>>>>>>> anywhere between the key change and the aggregation. On the
>>> other
>>>>>>>>>>>>> hand, if they say, "change keys, then filter, then
>>> repartition, then
>>>>>>>>>>>>> aggregate", it seems like they were pretty clear about their
>>> desire,
>>>>>>>>>>>>> and we should just take it at face value.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So, I'm sold on just literally doing a repartition every time
>>> they
>>>>>>>>>>>>> invoke the `repartition` operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is
>>> more
>>>>>>> nuanced.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What you said about `groupByKey` makes sense. If they key
>>> hasn't
>>>>>>>>>>>>> actually changed, then we don't need to repartition before
>>>>>>>>>>>>> aggregating. On the other hand, `groupBy` is specifically
>>> changing
>>>>>>> the
>>>>>>>>>>>>> key as part of the grouping operation, so (as you said) we
>>>>>>> definitely
>>>>>>>>>>>>> have to do a repartition.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I'm reading the discussion right, it seems like we do want
>>> to
>>>>>>> allow
>>>>>>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>>>>>>> operation, but we don't want that option to _force_
>>> repartitioning
>>>>>>> if
>>>>>>>>>>>>> it's not needed. That last clause is the key. "Use P
>>> partitions if
>>>>>>>>>>>>> repartitioning is necessary" is a directive that applies
>>> cleanly and
>>>>>>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call
>>> the
>>>>>>>>>>>>> option `numberOfPartitionsHint`, which along with the "if
>>> necessary"
>>>>>>>>>>>>> javadoc, should make it clear that the option won't force a
>>>>>>>>>>>>> repartition, and also gives us enough latitude to still employ
>>> the
>>>>>>>>>>>>> optimizer on those repartition topics?
>>>>>>>>>>>>>
>>>>>>>>>>>>> If we like the idea of expressing it as a "hint" for grouping
>>> and a
>>>>>>>>>>>>> "command" for `repartition`, then it seems like it still makes
>>> sense
>>>>>>>>>>>>> to keep Grouped and Repartitioned separate, as they would
>>> actually
>>>>>>>>>>>>> offer different methods with distinct semantics.
>>>>>>>>>>>>>
>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax <
>>>>>>> matth...@confluent.io> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sorry for late reply.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I guess, the question boils down to the intended semantics of
>>>>>>>>>>>>>> `repartition()`. My understanding is as follows:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using
>>> the
>>>>>>>>>>>>>> upstream topic to determine the number of partitions)
>>>>>>>>>>>>>> - KS does auto-repartitioning only for downstream DSL
>>> operators
>>>>>>> like
>>>>>>>>>>>>>> `count()` (eg, a `transform()` does never trigger an
>>>>>>> auto-repartitioning
>>>>>>>>>>>>>> even if the stream is marked as `repartitioningRequired`).
>>>>>>>>>>>>>> - KS offers `through()` to enforce a repartitioning --
>>> however,
>>>>>>> the user
>>>>>>>>>>>>>> needs to create the topic manually (with the desired number of
>>>>>>> partitions).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see two main applications for `repartitioning()`:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) repartition data before a `transform()` but user does not
>>> want
>>>>>>> to
>>>>>>>>>>>>>> manage the topic
>>>>>>>>>>>>>> 2) scale out a downstream subtopology
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a
>>> users
>>>>>>> calls
>>>>>>>>>>>>>> it, a repartitining is enforced, with the difference that KS
>>>>>>> manages the
>>>>>>>>>>>>>> topic and the user does not need to create it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This behavior makes (1) and (2) possible.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>>>>>>> repartition
>>>>>>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>>>>>>> have N partitions"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Because of (2), I disagree. Either a user does not care about
>>>>>>> scaling
>>>>>>>>>>>>>> out, for which case she would not specify the number of
>>>>>>> partitions. Or a
>>>>>>>>>>>>>> user does care, and hence wants to enforce the scale out. I
>>> don't
>>>>>>> think
>>>>>>>>>>>>>> that any user would say, "maybe scale out".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Therefore, the optimizer should never ignore the repartition
>>>>>>> operation.
>>>>>>>>>>>>>> As a "consequence" (because repartitioning is expensive) a
>>> user
>>>>>>> should
>>>>>>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking
>>> an
>>>>>>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be
>>> "dangerous"
>>>>>>>>>>>>>> because it might be too subtle and an "optional scaling out"
>>> as
>>>>>>> laid out
>>>>>>>>>>>>>> above does not make sense IMHO.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am also not worried about "over repartitioning" because the
>>>>>>> result
>>>>>>>>>>>>>> stream would never trigger auto-repartitioning. Only if
>>> multiple
>>>>>>>>>>>>>> consecutive calls to `repartition()` are made it could be bad
>>> --
>>>>>>> but
>>>>>>>>>>>>>> that's the same with `through()`. In the end, there is always
>>> some
>>>>>>>>>>>>>> responsibility on the user.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be
>>> required,
>>>>>>>>>>>>>> however, for `groupByKey()` it depends if the KStream is
>>> marked as
>>>>>>>>>>>>>> `repartitioningRequired`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a
>>> user to
>>>>>>> set
>>>>>>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different
>>> story,
>>>>>>>>>>>>>> because calling
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  `repartition().groupBy()`
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> does not achieve what we want. Hence, allowing users to pass
>>> in the
>>>>>>>>>>>>>> number of users partitions into `groupBy()` does actually
>>> makes
>>>>>>> sense,
>>>>>>>>>>>>>> because repartitioning will happen anyway and thus we can
>>>>>>> piggyback a
>>>>>>>>>>>>>> scaling decision.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think that John has a fair concern about the overloads,
>>> however,
>>>>>>> I am
>>>>>>>>>>>>>> not convinced that using `Grouped` to specify the number of
>>>>>>> partitions
>>>>>>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned`
>>> and
>>>>>>> both
>>>>>>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I
>>> am
>>>>>>>>>>>>>> wondering if we could bridge the gap between both, if we
>>> would make
>>>>>>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need
>>>>>>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what
>>> seems to
>>>>>>> make
>>>>>>>>>>>>>> the API quite smooth:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `stream.groupBy(..., Grouped...)`
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `stream.groupBy(..., Repartitioned...)`
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>> Hi Sophie,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you for your reply, very insightful. Looking forward
>>>>>>> hearing others opinion as well on this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman <
>>>>>>> sop...@confluent.io> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>>>>>>> other hand
>>>>>>>>>>>>>>>> Kafka Streams has already
>>>>>>>>>>>>>>>>> optimizer in place which alters topology independently
>>> from user
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I agree (with you) and think this is a good way to put it
>>> -- we
>>>>>>> currently
>>>>>>>>>>>>>>>> auto-repartition for the user so
>>>>>>>>>>>>>>>> that they don't have to walk through their entire topology
>>> and
>>>>>>> reason about
>>>>>>>>>>>>>>>> when and where to place a
>>>>>>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly
>>> force
>>>>>>> this onto the
>>>>>>>>>>>>>>>> user? How certain are we that
>>>>>>>>>>>>>>>> users will always get this right? It's easy to imagine that
>>>>>>> during
>>>>>>>>>>>>>>>> development, you write your new app with
>>>>>>>>>>>>>>>> correctly placed repartitions in order to use this new
>>> feature.
>>>>>>> During the
>>>>>>>>>>>>>>>> course of development you end up
>>>>>>>>>>>>>>>> tweaking the topology, but don't remember to review or move
>>> the
>>>>>>>>>>>>>>>> repartitioning since you're used to Streams
>>>>>>>>>>>>>>>> doing this for you. If you use only single-partition topics
>>> for
>>>>>>> testing,
>>>>>>>>>>>>>>>> you might not even notice your app is
>>>>>>>>>>>>>>>> spitting out incorrect results!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to
>>>>>>> introduce a new
>>>>>>>>>>>>>>>> feature and say that to use it, you can't take
>>>>>>>>>>>>>>>> advantage of this other feature anymore. Also, is it
>>> possible our
>>>>>>>>>>>>>>>> optimization framework could ever include an
>>>>>>>>>>>>>>>> optimized repartitioning strategy that is better than what a
>>>>>>> user could
>>>>>>>>>>>>>>>> achieve by manually inserting repartitions?
>>>>>>>>>>>>>>>> Do we expect users to have a deep understanding of the best
>>> way
>>>>>>> to
>>>>>>>>>>>>>>>> repartition their particular topology, or is it
>>>>>>>>>>>>>>>> likely they will end up over-repartitioning either due to
>>> missed
>>>>>>>>>>>>>>>> optimizations or unnecessary extra repartitions?
>>>>>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>>>>>>> repartition
>>>>>>>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>>>>>>>> have N partitions"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped
>>> rather
>>>>>>> than
>>>>>>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in
>>> line
>>>>>>> with the
>>>>>>>>>>>>>>>> current syntax so +1 from me
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze <
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under
>>> review and
>>>>>>> it’s
>>>>>>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP
>>> to
>>>>>>> address
>>>>>>>>>>>>>>>>> couple of concerns raised by Matthias and John.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users
>>>>>>> control over
>>>>>>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by:
>>>>>>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is
>>> done
>>>>>>> in
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>
>>>>>>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which
>>> is
>>>>>>> planned to
>>>>>>>>>>>>>>>>> be separate PR.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> While all agree about general implementation and idea
>>> behind
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR,
>>> introducing new
>>>>>>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some
>>>>>>> questions during
>>>>>>>>>>>>>>>>> the review.
>>>>>>>>>>>>>>>>> Matthias raised concern that there can be cases when user
>>> uses
>>>>>>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual
>>>>>>> repartitioning may
>>>>>>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned`
>>>>>>> would never be
>>>>>>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted
>>> your
>>>>>>> comment).
>>>>>>>>>>>>>>>>> So instead, if user wants to control parallelism of
>>>>>>> sub-topologies, he or
>>>>>>>>>>>>>>>>> she should always use `KStream#repartition` operation
>>> before
>>>>>>> groupBy. Full
>>>>>>>>>>>>>>>>> comment can be seen here:
>>>>>>>>>>>>>>>>>
>>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 <
>>>>>>>>>>>>>>>>>
>>>>>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On the same topic, John pointed out that, from API design
>>>>>>> perspective, we
>>>>>>>>>>>>>>>>> shouldn’t intertwine configuration classes of different
>>>>>>> operators between
>>>>>>>>>>>>>>>>> one another. So instead of introducing new
>>>>>>> `KStream#groupBy(Repartitioned)`
>>>>>>>>>>>>>>>>> for specifying number of partitions for internal topic, we
>>>>>>> should update
>>>>>>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>>>>>>> other hand
>>>>>>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters
>>>>>>> topology
>>>>>>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka
>>>>>>> Streams,
>>>>>>>>>>>>>>>>> internally would optimize topology in the best way
>>> possible,
>>>>>>> even if in
>>>>>>>>>>>>>>>>> some cases this means ignoring some operator configurations
>>>>>>> passed by the
>>>>>>>>>>>>>>>>> user. Also, I agree with John about API design semantics.
>>> If we
>>>>>>> go through
>>>>>>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes
>>> more
>>>>>>> sense to
>>>>>>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead
>>> of
>>>>>>> introducing
>>>>>>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would really appreciate communities feedback on this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman <
>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and
>>>>>>> don't have
>>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>> spare time at the
>>>>>>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on
>>>>>>> things, but
>>>>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>>> free to pick up something else
>>>>>>>>>>>>>>>>>> to work on in the meantime until things have calmed down
>>> a bit!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze <
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like
>>> to get
>>>>>>> some
>>>>>>>>>>>>>>>>>>> attention on this PR:
>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> <
>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>>
>>>>>>>>>>>>>>>>>>> It's been a while now and I would love to move on to
>>> other
>>>>>>> KIPs as well.
>>>>>>>>>>>>>>>>>>> Please let me know if you have any concerns.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze <
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Here’s voting thread for this KIP:
>>>>>>>>>>>>>>>>>>>
>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>>>>>>>
>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>
>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>>>>>>>
>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html
>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze <
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion
>>> on
>>>>>>> that one.
>>>>>>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a
>>> good
>>>>>>> idea.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Updated KIP
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax <
>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>
>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:
>>> matth...@confluent.io>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> One question:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Why do we add
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int
>>>>>>> numberOfPartitions)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> It seems that `#with(String name)`,
>>>>>>> `#numberOfPartitions(int)` in
>>>>>>>>>>>>>>>>>>>>>> combination with `withName()` and
>>>>>>> `withNumberOfPartitions()` should
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload
>>>>>>> small if
>>>>>>>>>>>>>>>>>>> possible.)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Otherwise LGTM.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks all for your feedback.
>>>>>>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re
>>> any
>>>>>>> other
>>>>>>>>>>>>>>>>>>> concerns about this KIP, please let me know.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze <
>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense.
>>>>>>>>>>>>>>>>>>>>>>>> I’ve updated KIP (
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:
>>> matth...@confluent.io>>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>> <mailto:
>>>>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a
>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and
>>>>>>> producer
>>>>>>>>>>>>>>>>>>> configs. My
>>>>>>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an
>>>>>>> "extension" to
>>>>>>>>>>>>>>>>>>> `Produced`,
>>>>>>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to
>>> think
>>>>>>> about it and
>>>>>>>>>>>>>>>>>>> helps
>>>>>>>>>>>>>>>>>>>>>>>>> to keep the API "clean".
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to
>>> avoid
>>>>>>>>>>>>>>>>> abbreviations.
>>>>>>>>>>>>>>>>>>> Can
>>>>>>>>>>>>>>>>>>>>>>>>> we rename:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more
>>> `static`
>>>>>>> methods:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>)
>>>>>>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int)
>>>>>>>>>>>>>>>>>>>>>>>>> -
>>> Repartitioned.streamPartitioner(StreamPartitioner)
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it
>>> makes
>>>>>>> sense to
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> some duplicate configurations between operators in order
>>> to
>>>>>>> keep API
>>>>>>>>>>>>>>>>> simple
>>>>>>>>>>>>>>>>>>> and usable.
>>>>>>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to
>>> have
>>>>>>> proper
>>>>>>>>>>>>>>>>>>> backward compatibility.
>>>>>>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs
>>>>>>> separate was
>>>>>>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some
>>>>>>> producer level
>>>>>>>>>>>>>>>>>>> configs makes API more readable.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman
>>> <
>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>>> <mailto:
>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>>> <mailto:
>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep
>>>>>>> producer level
>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level
>>>>>>> considerations
>>>>>>>>>>>>>>>>>>> separate.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic
>>>>>>> level
>>>>>>>>>>>>>>>>>>> configuration. But
>>>>>>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just
>>> as
>>>>>>> much a topic
>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have
>>> two
>>>>>>> producers
>>>>>>>>>>>>>>>>>>> configured
>>>>>>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if
>>>>>>> they are
>>>>>>>>>>>>>>>>>>> writing to the
>>>>>>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to
>>>>>>> part. So in a
>>>>>>>>>>>>>>>>>>> sense, these
>>>>>>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just
>>>>>>> producers.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams
>>> API is
>>>>>>> not always
>>>>>>>>>>>>>>>>>>> true to
>>>>>>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an
>>>>>>> operator are
>>>>>>>>>>>>>>>>>>> wrapped into a
>>>>>>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We
>>> could
>>>>>>> instead
>>>>>>>>>>>>>>>>>>> split out the
>>>>>>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate
>>>>>>> parameter to keep
>>>>>>>>>>>>>>>>>>> topic and
>>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this
>>>>>>> increases the
>>>>>>>>>>>>>>>>>>> API surface
>>>>>>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to
>>> just
>>>>>>> say "this
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs"
>>>>>>> without worrying
>>>>>>>>>>>>>>>>>>> about what
>>>>>>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a
>>>>>>> field of
>>>>>>>>>>>>>>>>>>> Repartitioned,
>>>>>>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition
>>>>>>> elsewhere in
>>>>>>>>>>>>>>>>>>> Streams at
>>>>>>>>>>>>>>>>>>>>>>>>>>> the moment
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with
>>>>>>>>>>>>>>>>>>> `Repartitioned#partitioner`
>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class
>>> for
>>>>>>> topic level
>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer
>>> level
>>>>>>>>>>>>>>>>>>> configurations (such
>>>>>>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in
>>> this
>>>>>>> thread).
>>>>>>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream
>>>>>>> interface, I
>>>>>>>>>>>>>>>>>>> couldn’t really
>>>>>>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level
>>>>>>> configuration
>>>>>>>>>>>>>>>>>>> class and just
>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda
>>> breaking
>>>>>>> the
>>>>>>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which
>>>>>>> encapsulates topic
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal
>>> topics is
>>>>>>> viable
>>>>>>>>>>>>>>>>>>> thing to do.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck <
>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com>
>>>>>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto:
>>>>>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option.
>>> In
>>>>>>> addition to
>>>>>>>>>>>>>>>>>>> the reason
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than
>>>>>>> hash-based
>>>>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something
>>>>>>> different than the
>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key.  For example:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in
>>>>>>> conjunction with
>>>>>>>>>>>>>>>>>>> the key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on
>>>>>>> Produced hence
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes
>>> sense
>>>>>>> for API
>>>>>>>>>>>>>>>>>>> consistency.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just my  2 cents.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to
>>>>>>> Repartitioned class,
>>>>>>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of
>>> partitions for
>>>>>>> internal
>>>>>>>>>>>>>>>>>>> topics, user
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure
>>> co-partitioning
>>>>>>> before join
>>>>>>>>>>>>>>>>>>> operation.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler <
>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>
>>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind.
>>> Again,
>>>>>>> not totally
>>>>>>>>>>>>>>>>>>> sure it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar
>>> is
>>>>>>> the
>>>>>>>>>>>>>>>>> rationale
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani
>>> Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>>
>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your
>>> example
>>>>>>> I would
>>>>>>>>>>>>>>>>>>> first issue
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper
>>> partitioner
>>>>>>> that
>>>>>>>>>>>>>>>>>>> essentially
>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with
>>> and
>>>>>>> then do the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#join
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with DSL?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler
>>> <
>>>>>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>>>>>>> j...@confluent.io
>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io
>>> <mailto:
>>>>>>>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an
>>> option to
>>>>>>> specify
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is
>>> that
>>>>>>> some data may
>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input
>>>>>>> topic. If the
>>>>>>>>>>>>>>>>>>> right-side
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning
>>>>>>> strategy, then the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be
>>>>>>> partitioned with the
>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss
>>>>>>> something
>>>>>>>>>>>>>>>>>>> important?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani
>>>>>>> Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>>
>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To
>>> be
>>>>>>> honest I’m
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it yet.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really
>>>>>>> think I would
>>>>>>>>>>>>>>>>>>> need control
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka
>>> Streams
>>>>>>> knows best
>>>>>>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should
>>> be
>>>>>>> used only for
>>>>>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have
>>>>>>> possibility to
>>>>>>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet,
>>> but I
>>>>>>> guess
>>>>>>>>>>>>>>>>>>> specifying the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s
>>>>>>> agreement on
>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie
>>>>>>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:
>>> sop...@confluent.io>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that
>>>>>>> Repartitioned
>>>>>>>>>>>>>>>>>>> would be a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also
>>> need
>>>>>>> to have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field,
>>> similar
>>>>>>> to
>>>>>>>>>>>>>>>>>>> Produced? I'm not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure how
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but
>>> seems
>>>>>>> it should
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by
>>> Kafka
>>>>>>> Streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user
>>> more
>>>>>>> control over
>>>>>>>>>>>>>>>>>>> the topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> such as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is
>>>>>>> something that
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> missing
>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over
>>>>>>> parallelism by
>>>>>>>>>>>>>>>>>>> configuring
>>>>>>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie
>>>>>>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:
>>> sop...@confluent.io>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one
>>>>>>> thing for me
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a
>>>>>>> Repartitioned,
>>>>>>>>>>>>>>>>>>> will the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems
>>> to be
>>>>>>> the case
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> signature
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it
>>> have to
>>>>>>> be
>>>>>>>>>>>>>>>>>>> pre-created? The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wording
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version
>>> of
>>>>>>> the method
>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-create
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>> <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing
>>> list :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities
>>>>>>> opinions/concerns
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>> this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward,
>>> I’ve
>>>>>>> updated
>>>>>>>>>>>>>>>>>>> confluence
>>>>>>>>>>>>>>>>>>>>>>>>>>>> page
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected
>>> Alternatives”
>>>>>>> section.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP
>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing
>>>>>>> dedicated `Topic`
>>>>>>>>>>>>>>>>>>> class for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators
>>> like
>>>>>>>>>>>>>>>>> `groupedBy`.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others
>>> opinion
>>>>>>> about this
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM,
>>> Matthias
>>>>>>> J. Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP!
>>> And
>>>>>>> thanks for
>>>>>>>>>>>>>>>>>>> summarizing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been
>>>>>>> discussed
>>>>>>>>>>>>>>>>>>> already (can't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a
>>> good
>>>>>>> summary to
>>>>>>>>>>>>>>>>>>> refresh the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes
>>> sense.
>>>>>>> With regard
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage
>>> topics
>>>>>>> and
>>>>>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>> that use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument,
>>> it
>>>>>>> might
>>>>>>>>>>>>>>>>> indicate
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator
>>> that
>>>>>>> uses
>>>>>>>>>>>>>>>>>>> use-defined
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics) and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()`
>>>>>>> operator (an
>>>>>>>>>>>>>>>>>>> operator that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manages
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good.
>>>>>>> Otherwise, there is
>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages
>>>>>>> topics but
>>>>>>>>>>>>>>>>>>> sometimes
>>>>>>>>>>>>>>>>>>>>>>>>>>>> not; a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make
>>> the
>>>>>>> distinction
>>>>>>>>>>>>>>>>>>> clearer.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to
>>>>>>> `Grouped`. I am
>>>>>>>>>>>>>>>>>>> wondering
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does
>>> apply
>>>>>>> and adding
>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantically
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get
>>>>>>> opinions of
>>>>>>>>>>>>>>>>>>> others on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, too.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I
>>> prefer
>>>>>>> atm.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration
>>> objects
>>>>>>> that allow
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer,
>>> producer,
>>>>>>> store. If we
>>>>>>>>>>>>>>>>>>> assume that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if
>>> we
>>>>>>> should have a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()`
>>> class
>>>>>>> and method
>>>>>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a plain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to
>>> add
>>>>>>> other
>>>>>>>>>>>>>>>>> configs,
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replication
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc,
>>> easily,
>>>>>>> without the
>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not
>>> sure
>>>>>>> if I like
>>>>>>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>>>>>>>>>> myself.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani
>>> Kokhreidze
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though -
>>>>>>> maybe
>>>>>>>>>>>>>>>>> enhancing
>>>>>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the
>>>>>>> best approach.
>>>>>>>>>>>>>>>>>>> Let me
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class
>>> with
>>>>>>> this
>>>>>>>>>>>>>>>>>>> configuration,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation.
>>> Since
>>>>>>> KStream#to is
>>>>>>>>>>>>>>>>>>> the final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be
>>> reasonable
>>>>>>>>>>>>>>>>> assumption
>>>>>>>>>>>>>>>>>>> that user
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in
>>> advance. And
>>>>>>> in that
>>>>>>>>>>>>>>>>>>> case, having
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make
>>> much
>>>>>>> sense.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class,
>>> based
>>>>>>> on API
>>>>>>>>>>>>>>>>>>> contract, seems
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something
>>> that
>>>>>>> is
>>>>>>>>>>>>>>>>>>> explicitly for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer,
>>> partitioner
>>>>>>> those all
>>>>>>>>>>>>>>>>>>> are producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions
>>> is
>>>>>>> topic level
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer
>>> level
>>>>>>>>>>>>>>>>>>> configurations
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface,
>>>>>>> seems like
>>>>>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with
>>> non-internal
>>>>>>> (e.g
>>>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics
>>> and I
>>>>>>> think we
>>>>>>>>>>>>>>>>>>> should leave
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into
>>> account,
>>>>>>> I think we
>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> distinguish
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka
>>>>>>> Streams should
>>>>>>>>>>>>>>>>>>> create and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs
>>>>>>> topics that
>>>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding
>>>>>>> numPartitions
>>>>>>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and
>>> producer
>>>>>>> level
>>>>>>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API
>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name
>>> optional
>>>>>>> in
>>>>>>>>>>>>>>>>>>> KStream#through - I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and
>>> giving
>>>>>>> users
>>>>>>>>>>>>>>>>>>> possibility to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more
>>> useful :)
>>>>>>>>>>>>>>>>> Considering
>>>>>>>>>>>>>>>>>>> arguments
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced
>>>>>>> class, I see two
>>>>>>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>>>>>>> auto-generated and
>>>>>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int
>>> numOfPartitions)
>>>>>>> - topic
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of
>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int
>>> numOfPartitions,
>>>>>>> final
>>>>>>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with
>>> generated
>>>>>>> with
>>>>>>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>>>>>>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it
>>> is and
>>>>>>> introduce
>>>>>>>>>>>>>>>>>>> new method
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias
>>>>>>> suggested this
>>>>>>>>>>>>>>>>>>> in one of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I
>>>>>>> propose the
>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>>>>>>>>> plan:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>>>>>>> configuration to
>>>>>>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method
>>> overloads to
>>>>>>>>>>>>>>>>>>> KStream#through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>>>>>>> auto-generated and
>>>>>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int
>>> numOfPartitions)
>>>>>>> - topic
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of
>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int
>>> numOfPartitions,
>>>>>>> final
>>>>>>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with
>>> generated
>>>>>>> with
>>>>>>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>>>>>>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>>>>>>> configuration to
>>>>>>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator
>>>>>>> KStream#repartition for
>>>>>>>>>>>>>>>>>>> creating and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was
>>>>>>> already
>>>>>>>>>>>>>>>>>>> discussed in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the
>>> threads
>>>>>>> that were
>>>>>>>>>>>>>>>>>>> about this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP :(
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM,
>>> Levani
>>>>>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>>>>>>>>>>>> levani.co...@gmail.com>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect
>>> discussion
>>>>>>> around
>>>>>>>>>>>>>>>>>>> KIP-221. Going
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems
>>> to
>>>>>>> agreement
>>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation,
>>> as far
>>>>>>> as I
>>>>>>>>>>>>>>>>>>> understood, the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the
>>>>>>> following:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to
>>>>>>> KStream#through
>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (essentially
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with
>>>>>>> numOfPartitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL
>>>>>>> users to
>>>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing
>>> stateful
>>>>>>>>>>>>>>>>> operations.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface
>>>>>>> changes around
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to