Hello,

Just fyi, PR was updated and now it incorporates the latest suggestions about 
joins. 
`CopartitionedTopicsEnforcer` will throw an exception if number of partitions 
aren’t the same when using `repartition` operation along with `join`.

For more details please take a look at the PR: 
https://github.com/apache/kafka/pull/7170/files 
<https://github.com/apache/kafka/pull/7170/files>

Regards,
Levani 


> On Nov 15, 2019, at 11:01 AM, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Thanks a lot for the input Sophie.
> 
> Your example is quite useful, and I would use it to support my claim
> that a "partition hint" for `Grouped` seems "useless" and does not
> improve the user experience.
> 
> 1) You argue that a new user would be worries about repartitions topics
> with too many paritions. This would imply that a user is already
> advanced enough to understand the implication of repartitioning -- for
> this case, I would argue that a user also understand _when_ a
> auto-repartitioning would happen and thus the users understands where to
> insert a `repartition()` operation.
> 
> 2) For specifying Serdes: if a `groupByKey()` does not trigger
> auto-repartitioning it's not required to specify the serdes and if they
> are specified they would be ignored/unused (note, that `groupBy()` would
> always trigger a repartitioning). Of course, if the default Serdes from
> the config match (eg, all data types are Json anyway), a user does not
> need to worry about specifying serdes. -- For new user that play around,
> I would assume that they work a lot with primitive types and thus would
> need to specify the serdes -- hence, they would learn about
> auto-repartitioning the hard way anyhow, because each time a
> `groupByKey()` does trigger auto-repartioning, they would need to pass
> in the correct Serdes -- this way, they would also be educated where to
> insert a `repartition()` operator if needed.
> 
> 3) If a new user really just "plays around", I don't think they use an
> input topic with 100 partitions but most likely have a local single node
> broker with most likely single partitions topics.
> 
> 
> My main argument for my current proposal is however, that---based on
> past experience---it's better to roll out a new feature more carefully
> and see how it goes. Last, as John pointed out, we can still extend the
> feature in the future. Instead of making a judgment call up-front, being
> more conservative and less fancy, and revisit the design based on
> actuall user feedback after the first version is rolled out, seems to be
> the better option. Undoing a feature is must harder than extending it.
> 
> 
> While I advocate strong for a simple first version of this feature, it's
> a community decission in the end, and I would not block this KIP if
> there is a broad preference to add `Grouped#withNumberOfPartitions()`
> either.
> 
> 
> -Matthias
> 
> On 11/14/19 11:35 PM, Sophie Blee-Goldman wrote:
>> It seems like we all agree at this point (please correct me if wrong!) that
>> we should NOT change
>> the existing repartitioning behavior, ie we should allow Streams to
>> continue to determine when and
>> where to repartition -- *unless* explicitly informed to by the use of a
>> .through or the new .repartition operator.
>> 
>> Regarding groupBy, the existing behavior we should not disrupt is
>> a) repartition *only* when required due to upstream key-changing operation
>> (ie don't force repartitioning
>> based on the presence of an optional config parameter), and
>> b) allow optimization of required repartitions, if any
>> 
>> Within the constraint of not breaking the existing behavior, this still
>> leaves open the question of whether we
>> want to improve the user experience by allowing to provide groupBy with a
>> *suggestion* for numPartitions (or to
>> put it more fairly, whether that *will* improve the experience). I agree
>> with many of the arguments outlined above but
>> let me just push back on this one issue one final time, and if we can't
>> come to a consensus then I am happy to drop
>> it for now so that the KIP can proceed.
>> 
>> Specifically, my proposal would be to simply augment Grouped with an
>> optional numPartitions, understood to
>> indicate the user's desired number of partitions *if Streams decides to
>> repartition due to that groupBy*
>> 
>>> if a user cares about the number of partition, the user wants to enforce
>> a repartitioning
>> First, I think we should take a step back and examine this claim. I agree
>> 100% that *if this is true,*
>> *then we should not give groupBy an optional numPartitions.* As far as I
>> see it, there's no argument
>> to be had there if we *presuppose that claim.* But I'm not convinced in
>> that as an axiom of the user
>> experience and think we should be examining that claim itself, not the
>> consequences of it.
>> 
>> To give a simple example, let's say some new user is trying out Streams and
>> wants to just play around
>> with it to see if it might be worth looking into. They want to just write
>> up a simple app and test it out on the
>> data in some existing topics they have with a large number of partitions,
>> and a lot of data. They're just messing
>> around, trying new topologies and don't want to go through each new one
>> step by step to determine if (or where)
>> a repartition might be required. They also don't want to force a
>> repartition if it turns out to not be required, so they'd
>> like to avoid the nice new .repartition operator they saw. But given the
>> huge number of input partitions, they'd like
>> to rest assured that if a repartition does end up being required somewhere
>> during dev, it will not be created with
>> the same huge number of partitions that their input topic has -- so they
>> just pass groupBy a small numPartitions
>> suggestion.
>> 
>> I know that's a bit of a contrived example but I think it does highlight
>> how and when this might be a considerable
>> quality of life improvement, in particular for new users to Streams and/or
>> during the dev cycle. *You don't want to*
>> *force a repartition if it wasn't necessary, but you don't want to create a
>> topic with a huge partition count either.*
>> 
>> Also, while the optimization discussion took us down an interesting but
>> ultimately more distracting road, it's worth
>> pointing out that it is clearly a major win to have as few
>> repartition topics/steps as possible. Given that we
>> don't want to change existing behavior, the optimization framework can only
>> help out when the placement of
>> repartition steps is flexible, which means only those from .groupBy (and
>> not .repartition). *Users should not*
>> *have to choose between allowing Streams to optimize the repartition
>> placement, and allowing to specify a *
>> *number of partitions.*
>> 
>> Lastly, I have what may be a stupid question but for my own edification of
>> how groupBy works:
>> if you do a .groupBy and a repartition is NOT required, does it ever need
>> to serialize/deserialize
>> any of the data? In other words, if you pass a key/value serde to groupBy
>> and it doesn't trigger
>> a repartition, is the serde(s) just ignored and thus more like a suggestion
>> than a requirement?
>> 
>> So again, I don't want to hold up this KIP forever but I feel we've spent
>> some time getting slightly
>> off track (although certainly into very interesting discussions) yet never
>> really addressed or questioned
>> the basic premise: *could a user want to specify a number of partitions but
>> not enforce a repartition (at that*
>> *specific point in the topology)?*
>> 
>> 
>> 
>> On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>> 
>>> Side remark:
>>> 
>>> If the user specifies `repartition()` on both side of the join, we can
>>> actually throw the execption earlier, ie, when we build the topology.
>>> 
>>> Current, we can do this check only after Kafka Streams was started,
>>> within `StreamPartitionAssignor#assign()` -- we still need to keep this
>>> check for the case that none or only one side has a user specified
>>> number of partitions though.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 11/14/19 8:15 AM, John Roesler wrote:
>>>> Thanks, all,
>>>> 
>>>> I can get behind just totally leaving out reparation-via-groupBy. If
>>>> we only introduce `repartition()` for now, we're making the minimal
>>>> change to gain the desired capability.
>>>> 
>>>> Plus, since we agree that `repartition()` should never be optimizable,
>>>> it's a future-compatible proposal. I.e., if we were to add a
>>>> non-optimizable groupBy(partitions) operation now, and want to make it
>>>> optimizable in the future, we have to worry about topology
>>>> compatibility. Better to just do non-optimizable `repartition()` now,
>>>> and add an optimizable `groupBy(partitions)` in the future (maybe).
>>>> 
>>>> About joins, yes, it's a concern, and IMO we should just do the same
>>>> thing we do now... check at runtime that the partition counts on both
>>>> sides match and throw an exception otherwise. What this means as a
>>>> user is that if you explicitly repartition the left side to 100
>>>> partitions, and then join with the right side at 10 partitions, you
>>>> get an exception, since this operation is not possible. You'd either
>>>> have to "step down" the left side again, back to 10 partitions, or you
>>>> could repartition the right side to 100 partitions before the join.
>>>> The choice has to be the user's, since it depends on their desired
>>>> execution parallelism.
>>>> 
>>>> Thanks,
>>>> -John
>>>> 
>>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>>> 
>>>>> Thanks a lot John. I think the way you decompose the operators is super
>>>>> helpful for this discussion.
>>>>> 
>>>>> What you suggest with regard to using `Grouped` and enforcing
>>>>> repartitioning if the number of partitions is specified is certainly
>>>>> possible. However, I am not sure if we _should_ do this. My reasoning is
>>>>> that an enforce repartitioning as introduced via `repartition()` is an
>>>>> expensive operations, and it seems better to demand an more explicit
>>>>> user opt-in to trigger it. Just setting an optional parameter might be
>>>>> too subtle to trigger such a heavy "side effect".
>>>>> 
>>>>> While I agree about "usability" in general, I would prefer a more
>>>>> conservative appraoch to introduce this feature, see how it goes, and
>>>>> maybe make it more advance later on. This also applies to what
>>>>> optimzation we may or may not allow (or are able to perform at all).
>>>>> 
>>>>> @Levani: Reflecting about my suggestion about `Repartioned extends
>>>>> Grouped`, I agree that it might not be a good idea.
>>>>> 
>>>>> Atm, I see an enforces repartitioning as non-optimizable and as a good
>>>>> first step and I would suggest to not intoruce anything else for now.
>>>>> Introducing optimizable enforce repartitioning via `groupBy(...,
>>>>> Grouped)` is something we could add later.
>>>>> 
>>>>> 
>>>>> Therefore, I would not change `Grouped` but only introduce
>>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to
>>>>> set the number of partitions, would need to rewrite their code to
>>>>> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but
>>>>> also less risky from an API and optimization point of view.
>>>>> 
>>>>> 
>>>>> @Levani: about joins -> yes, we will need to check the specified number
>>>>> of partitions (if any) and if they don't match, throw an exception. We
>>>>> can discuss this on the PR -- I am just trying to get the PR for KIP-466
>>>>> merged -- your is next on the list :)
>>>>> 
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
>>>>>> Thank you all for an interesting discussion. This is very enlightening.
>>>>>> 
>>>>>> Thank you Matthias for your explanation. Your arguments are very true.
>>> It makes sense that if user specifies number of partitions he/she really
>>> cares that those specifications are applied to internal topics.
>>>>>> Unfortunately, in current implementation this is not true during
>>> `join` operation. As I’ve written in the PR comment, currently, when
>>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max number of
>>> partitions from the two source topics.
>>>>>> I’m not really sure what would be the other way around this situation.
>>> Maybe fail the stream altogether and inform the user to specify same number
>>> of partitions?
>>>>>> Or we should treat join operations in a same way as it is right now
>>> and basically choose max number of partitions even when `repartition`
>>> operation is specified, because Kafka Streams “knows the best” how to
>>> handle joins?
>>>>>> You can check integration tests how it’s being handled currently. Open
>>> to suggestions on that part.
>>>>>> 
>>>>>> As for groupBy, I agree and John raised very interesting points. My
>>> arguments for allowing users to specify number of partitions during groupBy
>>> operations mainly was coming from the usability perspective.
>>>>>> So building on top of what John said, maybe it makes sense to make
>>> `groupBy` operations smarter and whenever user specifies
>>> `numberOfPartitions` configuration, repartitioning will be enforced, wdyt?
>>>>>> I’m not going into optimization part yet :) I think it will be part of
>>> separate PR and task, but overall it makes sense to apply optimizations
>>> where number of partitions are the same.
>>>>>> 
>>>>>> As for Repartitioned extending Grouped, I kinda feel that it won’t fit
>>> nicely in current API design.
>>>>>> In addition, in the PR review, John mentioned that there were a lot of
>>> troubles in the past trying to use one operation's configuration objects on
>>> other operations.
>>>>>> Also it makes sense to keep them separate in terms of compatibility.
>>>>>> In that case, we don’t have to worry every time Grouped is changed,
>>> what would be the implications on `repartition` operations.
>>>>>> 
>>>>>> Kind regards,
>>>>>> Levani
>>>>>> 
>>>>>> 
>>>>>>> On Nov 11, 2019, at 9:13 PM, John Roesler <j...@confluent.io> wrote:
>>>>>>> 
>>>>>>> Ah, thanks for the clarification. I missed your point.
>>>>>>> 
>>>>>>> I like the framework you've presented. It does seem simpler to assume
>>>>>>> that they either care about the partition count and want to
>>>>>>> repartition to realize it, or they don't care about the number.
>>>>>>> Returning to this discussion, it does seem unlikely that they care
>>>>>>> about the number and _don't_ care if it actually gets realized.
>>>>>>> 
>>>>>>> But then, it still seems like we can just keep the option as part of
>>>>>>> Grouped. As in:
>>>>>>> 
>>>>>>> // user does not care
>>>>>>> stream.groupByKey(Grouped /*not specifying partition count*/)
>>>>>>> stream.groupBy(Grouped /*not specifying partition count*/)
>>>>>>> 
>>>>>>> // user does care
>>>>>>> stream.repartition(Repartitioned)
>>>>>>> stream.groupByKey(Grouped.numberOfPartitions(...))
>>>>>>> stream.groupBy(Grouped.numberOfPartitions(...))
>>>>>>> 
>>>>>>> ----
>>>>>>> 
>>>>>>> The above discussion got me thinking about algebra. Matthias is
>>>>>>> absolutely right that `groupByKey(numPartitions)` is equivalent to
>>>>>>> `repartition(numPartitions).groupByKey()`. I'm just not convinced that
>>>>>>> we should force people to apply that expansion themselves vs. having a
>>>>>>> more compact way to express it if they don't care where exactly the
>>>>>>> repartition occurs. However, thinking about these operators
>>>>>>> algebraically can really help *us* narrow down the number of different
>>>>>>> expressions we have to consider.
>>>>>>> 
>>>>>>> Let's consider some identities:
>>>>>>> 
>>>>>>> A: groupBy(mapper) + agg = mapKey(mapper) + groupByKey + agg
>>>>>>> B: src + ... + groupByKey + agg = src + ... + passthough + agg
>>>>>>> C: mapKey(mapper) + ... + groupByKey + agg
>>>>>>> = mapKey(mapper) + ... + repartition + groupByKey + agg
>>>>>>> D: repartition = sink(managed) + src
>>>>>>> 
>>>>>>> In these identities, I used one special identifier (...), which means
>>>>>>> any number (0+) of operations that are not src, mapKey, groupBy[Key],
>>>>>>> repartition, or agg.
>>>>>>> 
>>>>>>> For mental clarity, I'm just going to make up a rule that groupBy
>>>>>>> operations are not executable. In other words, you have to get to a
>>>>>>> point where you can apply B to convert a groupByKey into a passthough
>>>>>>> in order to execute the program. This is just a formal way of stating
>>>>>>> what already happens in Kafka Streams.
>>>>>>> 
>>>>>>> By applying A, we can just completely leave `groupBy` out of our
>>>>>>> analysis. It trivially decomposes into a mapKey followed by a
>>>>>>> groupByKey.
>>>>>>> 
>>>>>>> Then, we can eliminate the "repartition required" case of `groupByKey`
>>>>>>> by applying C followed by D to get to the "no repartition required"
>>>>>>> version of groupByKey, which in turn sets us up to apply B to get an
>>>>>>> executable topology.
>>>>>>> 
>>>>>>> Fundamentally, you can think about KIP-221 is as proposing a modified
>>>>>>> D identity in which you can specify the partition count of the managed
>>>>>>> sink topic:
>>>>>>> D': repartition(pc) = sink(managed w/ pc) + src
>>>>>>> 
>>>>>>> Since users _could_ apply the identities above, we don't actually have
>>>>>>> to add any partition count to groupBy[Key], but we decided early on in
>>>>>>> the KIP discussion that it's more ergonomic to add it. In that case,
>>>>>>> we also have to modify A and C:
>>>>>>> A': groupBy(mapper, pc) + agg
>>>>>>> = mapKey(mapper) + groupByKey(pc) + agg
>>>>>>> C': mapKey(mapper) + ... + groupByKey(pc) + agg
>>>>>>> = mapKey(mapper) + ... + repartition(pc) + groupByKey + agg
>>>>>>> 
>>>>>>> Which sets us up still to always be able to get back to a plain
>>>>>>> `groupByKey` operation (with no `(pc)`) and then apply D' and
>>>>>>> ultimately B to get an executable topology.
>>>>>>> 
>>>>>>> What about the optimizer?
>>>>>>> The optimizer applies another set of graph-algebraic identities to
>>>>>>> minimize the number of repartition topics in a topology.
>>>>>>> 
>>>>>>> (forgive my ascii art)
>>>>>>> 
>>>>>>> E: (merging repartition nodes)
>>>>>>> (...) -> repartition -> X
>>>>>>> \-> repartition -> Y
>>>>>>> =
>>>>>>> (... + repartition) -> X
>>>>>>>    \-> Y
>>>>>>> F: (reordering around repartition)
>>>>>>> Where SVO is any non-key-changing, stateless, operation:
>>>>>>> repartition -> SVO = SVO -> repartition
>>>>>>> 
>>>>>>> In terms of these identities, what the optimizer does is apply F
>>>>>>> repeatedly in either direction to a topology to factor out common in
>>>>>>> branches so that it can apply E to merge repartition nodes. This was
>>>>>>> especially necessary before KIP-221 because you couldn't directly
>>>>>>> express `repartition` in the DSL, only indirectly via `groupBy[Key]`,
>>>>>>> so there was no way to do the factoring manually.
>>>>>>> 
>>>>>>> We can now state very clearly that in KIP-221, explicit
>>>>>>> `repartition()` operators should create a "reordering barrier". So, F
>>>>>>> cannot be applied to an explicit `repartition()`. Also, I think we
>>>>>>> decided earlier that explicit `repartition()` operations would also be
>>>>>>> ineligible for merging, so E can't be applied to explicit
>>>>>>> `repartition()` operations either. I think we feel we _could_ apply E
>>>>>>> without harm, but we want to be conservative for now.
>>>>>>> 
>>>>>>> I think the salient point from the latter discussion has been that
>>>>>>> when you use `Grouped.numberOfPartitions`, this does _not_ constitute
>>>>>>> an explicit `repartition()` operator, and therefore, the resulting
>>>>>>> repartition node remains eligible for optimization.
>>>>>>> 
>>>>>>> To be clear, I agree with Matthias that the provided partition count
>>>>>>> _must_ be used in the resulting implicit repartition. This has some
>>>>>>> implications for E. Namely, E could only be applied to two repartition
>>>>>>> nodes that have the same partition count. This has always been
>>>>>>> trivially true before KIP-221 because the partition count has always
>>>>>>> been "unspecified", i.e., it would be determined at runtime by the
>>>>>>> user-managed-topics' partition counts. Now, it could be specified or
>>>>>>> unspecified. We can simply augment E to allow merging only repartition
>>>>>>> nodes where the partition count is EITHER "specified and the same on
>>>>>>> both sides", OR "unspecified on both sides".
>>>>>>> 
>>>>>>> Sorry for the long email, but I have a hope that it builds a solid
>>>>>>> theoretical foundation for our decisions in KIP-221, so we can have
>>>>>>> confidence that there are no edge cases for design flaws to hide.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>> 
>>>>>>> On Sat, Nov 9, 2019 at 10:37 PM Matthias J. Sax <
>>> matth...@confluent.io> wrote:
>>>>>>>> 
>>>>>>>>> it seems like we do want to allow
>>>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>>>> operation, but we don't want that option to _force_ repartitioning
>>>>>>>> 
>>>>>>>> Correct, ie, that is my suggestions.
>>>>>>>> 
>>>>>>>>> "Use P partitions if repartitioning is necessary"
>>>>>>>> 
>>>>>>>> I disagree here, because my reasoning is that:
>>>>>>>> 
>>>>>>>> - if a user cares about the number of partition, the user wants to
>>>>>>>> enforce a repartitioning
>>>>>>>> - if a user does not case about the number of partitions, we don't
>>> need
>>>>>>>> to provide them a way to pass in a "hint"
>>>>>>>> 
>>>>>>>> Hence, it should be sufficient to support:
>>>>>>>> 
>>>>>>>> // user does not care
>>>>>>>> 
>>>>>>>> `stream.groupByKey(Grouped)`
>>>>>>>> `stream.grouBy(..., Grouped)`
>>>>>>>> 
>>>>>>>> // user does care
>>>>>>>> 
>>>>>>>> `stream.repartition(Repartitioned).groupByKey()`
>>>>>>>> `streams.groupBy(..., Repartitioned)`
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 11/9/19 8:10 PM, John Roesler wrote:
>>>>>>>>> Thanks for those thoughts, Matthias,
>>>>>>>>> 
>>>>>>>>> I find your reasoning about the optimization behavior compelling.
>>> The
>>>>>>>>> `through` operation is very simple and clear to reason about. It
>>> just
>>>>>>>>> passes the data exactly at the specified point in the topology
>>> exactly
>>>>>>>>> through the specified topic. Likewise, if a user invokes a
>>>>>>>>> `repartition` operator, the simplest behavior is if we just do what
>>>>>>>>> they asked for.
>>>>>>>>> 
>>>>>>>>> Stepping back to think about when optimizations are surprising and
>>>>>>>>> when they aren't, it occurs to me that we should be free to move
>>>>>>>>> around repartitions when users have asked to perform some operation
>>>>>>>>> that implies a repartition, like "change keys, then filter, then
>>>>>>>>> aggregate". This program requires a repartition, but it could be
>>>>>>>>> anywhere between the key change and the aggregation. On the other
>>>>>>>>> hand, if they say, "change keys, then filter, then repartition, then
>>>>>>>>> aggregate", it seems like they were pretty clear about their desire,
>>>>>>>>> and we should just take it at face value.
>>>>>>>>> 
>>>>>>>>> So, I'm sold on just literally doing a repartition every time they
>>>>>>>>> invoke the `repartition` operator.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> The "partition count" modifier for `groupBy`/`groupByKey` is more
>>> nuanced.
>>>>>>>>> 
>>>>>>>>> What you said about `groupByKey` makes sense. If they key hasn't
>>>>>>>>> actually changed, then we don't need to repartition before
>>>>>>>>> aggregating. On the other hand, `groupBy` is specifically changing
>>> the
>>>>>>>>> key as part of the grouping operation, so (as you said) we
>>> definitely
>>>>>>>>> have to do a repartition.
>>>>>>>>> 
>>>>>>>>> If I'm reading the discussion right, it seems like we do want to
>>> allow
>>>>>>>>> people to optionally specify a partition count as part of this
>>>>>>>>> operation, but we don't want that option to _force_ repartitioning
>>> if
>>>>>>>>> it's not needed. That last clause is the key. "Use P partitions if
>>>>>>>>> repartitioning is necessary" is a directive that applies cleanly and
>>>>>>>>> correctly to both `groupBy` and `groupByKey`. What if we call the
>>>>>>>>> option `numberOfPartitionsHint`, which along with the "if necessary"
>>>>>>>>> javadoc, should make it clear that the option won't force a
>>>>>>>>> repartition, and also gives us enough latitude to still employ the
>>>>>>>>> optimizer on those repartition topics?
>>>>>>>>> 
>>>>>>>>> If we like the idea of expressing it as a "hint" for grouping and a
>>>>>>>>> "command" for `repartition`, then it seems like it still makes sense
>>>>>>>>> to keep Grouped and Repartitioned separate, as they would actually
>>>>>>>>> offer different methods with distinct semantics.
>>>>>>>>> 
>>>>>>>>> WDYT?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> -John
>>>>>>>>> 
>>>>>>>>> On Sat, Nov 9, 2019 at 8:28 PM Matthias J. Sax <
>>> matth...@confluent.io> wrote:
>>>>>>>>>> 
>>>>>>>>>> Sorry for late reply.
>>>>>>>>>> 
>>>>>>>>>> I guess, the question boils down to the intended semantics of
>>>>>>>>>> `repartition()`. My understanding is as follows:
>>>>>>>>>> 
>>>>>>>>>> - KS does auto-repartitioning for correctness reasons (using the
>>>>>>>>>> upstream topic to determine the number of partitions)
>>>>>>>>>> - KS does auto-repartitioning only for downstream DSL operators
>>> like
>>>>>>>>>> `count()` (eg, a `transform()` does never trigger an
>>> auto-repartitioning
>>>>>>>>>> even if the stream is marked as `repartitioningRequired`).
>>>>>>>>>> - KS offers `through()` to enforce a repartitioning -- however,
>>> the user
>>>>>>>>>> needs to create the topic manually (with the desired number of
>>> partitions).
>>>>>>>>>> 
>>>>>>>>>> I see two main applications for `repartitioning()`:
>>>>>>>>>> 
>>>>>>>>>> 1) repartition data before a `transform()` but user does not want
>>> to
>>>>>>>>>> manage the topic
>>>>>>>>>> 2) scale out a downstream subtopology
>>>>>>>>>> 
>>>>>>>>>> Hence, I see `repartition()` similar to `through()`: if a users
>>> calls
>>>>>>>>>> it, a repartitining is enforced, with the difference that KS
>>> manages the
>>>>>>>>>> topic and the user does not need to create it.
>>>>>>>>>> 
>>>>>>>>>> This behavior makes (1) and (2) possible.
>>>>>>>>>> 
>>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>>> repartition
>>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>>> have N partitions"
>>>>>>>>>> 
>>>>>>>>>> Because of (2), I disagree. Either a user does not care about
>>> scaling
>>>>>>>>>> out, for which case she would not specify the number of
>>> partitions. Or a
>>>>>>>>>> user does care, and hence wants to enforce the scale out. I don't
>>> think
>>>>>>>>>> that any user would say, "maybe scale out".
>>>>>>>>>> 
>>>>>>>>>> Therefore, the optimizer should never ignore the repartition
>>> operation.
>>>>>>>>>> As a "consequence" (because repartitioning is expensive) a user
>>> should
>>>>>>>>>> make an explicit call to `repartition()` IMHO -- piggybacking an
>>>>>>>>>> enforced repartitioning into `groupByKey()` seems to be "dangerous"
>>>>>>>>>> because it might be too subtle and an "optional scaling out" as
>>> laid out
>>>>>>>>>> above does not make sense IMHO.
>>>>>>>>>> 
>>>>>>>>>> I am also not worried about "over repartitioning" because the
>>> result
>>>>>>>>>> stream would never trigger auto-repartitioning. Only if multiple
>>>>>>>>>> consecutive calls to `repartition()` are made it could be bad --
>>> but
>>>>>>>>>> that's the same with `through()`. In the end, there is always some
>>>>>>>>>> responsibility on the user.
>>>>>>>>>> 
>>>>>>>>>> Btw, for `.groupBy()` we know that repartitioning will be required,
>>>>>>>>>> however, for `groupByKey()` it depends if the KStream is marked as
>>>>>>>>>> `repartitioningRequired`.
>>>>>>>>>> 
>>>>>>>>>> Hence, for `groupByKey()` it should not be possible for a user to
>>> set
>>>>>>>>>> number of partitions IMHO. For `groupBy()` it's a different story,
>>>>>>>>>> because calling
>>>>>>>>>> 
>>>>>>>>>>  `repartition().groupBy()`
>>>>>>>>>> 
>>>>>>>>>> does not achieve what we want. Hence, allowing users to pass in the
>>>>>>>>>> number of users partitions into `groupBy()` does actually makes
>>> sense,
>>>>>>>>>> because repartitioning will happen anyway and thus we can
>>> piggyback a
>>>>>>>>>> scaling decision.
>>>>>>>>>> 
>>>>>>>>>> I think that John has a fair concern about the overloads, however,
>>> I am
>>>>>>>>>> not convinced that using `Grouped` to specify the number of
>>> partitions
>>>>>>>>>> is intuitive. I double checked `Grouped` and `Repartitioned` and
>>> both
>>>>>>>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I am
>>>>>>>>>> wondering if we could bridge the gap between both, if we would make
>>>>>>>>>> `Repartitioned extends Grouped`? For this case, we only need
>>>>>>>>>> `groupBy(Grouped)` and a user can pass in both types what seems to
>>> make
>>>>>>>>>> the API quite smooth:
>>>>>>>>>> 
>>>>>>>>>> `stream.groupBy(..., Grouped...)`
>>>>>>>>>> 
>>>>>>>>>> `stream.groupBy(..., Repartitioned...)`
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Thoughts?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote:
>>>>>>>>>>> Hi Sophie,
>>>>>>>>>>> 
>>>>>>>>>>> Thank you for your reply, very insightful. Looking forward
>>> hearing others opinion as well on this.
>>>>>>>>>>> 
>>>>>>>>>>> Kind regards,
>>>>>>>>>>> Levani
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman <
>>> sop...@confluent.io> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>>> other hand
>>>>>>>>>>>> Kafka Streams has already
>>>>>>>>>>>>> optimizer in place which alters topology independently from user
>>>>>>>>>>>> 
>>>>>>>>>>>> I agree (with you) and think this is a good way to put it -- we
>>> currently
>>>>>>>>>>>> auto-repartition for the user so
>>>>>>>>>>>> that they don't have to walk through their entire topology and
>>> reason about
>>>>>>>>>>>> when and where to place a
>>>>>>>>>>>> `.through` (or the new `.repartition`), so why suddenly force
>>> this onto the
>>>>>>>>>>>> user? How certain are we that
>>>>>>>>>>>> users will always get this right? It's easy to imagine that
>>> during
>>>>>>>>>>>> development, you write your new app with
>>>>>>>>>>>> correctly placed repartitions in order to use this new feature.
>>> During the
>>>>>>>>>>>> course of development you end up
>>>>>>>>>>>> tweaking the topology, but don't remember to review or move the
>>>>>>>>>>>> repartitioning since you're used to Streams
>>>>>>>>>>>> doing this for you. If you use only single-partition topics for
>>> testing,
>>>>>>>>>>>> you might not even notice your app is
>>>>>>>>>>>> spitting out incorrect results!
>>>>>>>>>>>> 
>>>>>>>>>>>> Anyways, I feel pretty strongly that it would be weird to
>>> introduce a new
>>>>>>>>>>>> feature and say that to use it, you can't take
>>>>>>>>>>>> advantage of this other feature anymore. Also, is it possible our
>>>>>>>>>>>> optimization framework could ever include an
>>>>>>>>>>>> optimized repartitioning strategy that is better than what a
>>> user could
>>>>>>>>>>>> achieve by manually inserting repartitions?
>>>>>>>>>>>> Do we expect users to have a deep understanding of the best way
>>> to
>>>>>>>>>>>> repartition their particular topology, or is it
>>>>>>>>>>>> likely they will end up over-repartitioning either due to missed
>>>>>>>>>>>> optimizations or unnecessary extra repartitions?
>>>>>>>>>>>> I think many users would prefer to just say "if there *is* a
>>> repartition
>>>>>>>>>>>> required at this point in the topology, it should
>>>>>>>>>>>> have N partitions"
>>>>>>>>>>>> 
>>>>>>>>>>>> As to the idea of adding `numberOfPartitions` to Grouped rather
>>> than
>>>>>>>>>>>> adding a new parameter to groupBy, that does seem more in line
>>> with the
>>>>>>>>>>>> current syntax so +1 from me
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze <
>>> levani.co...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> While https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> is under review and
>>> it’s
>>>>>>>>>>>>> almost done, I want to resurrect discussion about this KIP to
>>> address
>>>>>>>>>>>>> couple of concerns raised by Matthias and John.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> As a reminder, idea of the KIP-221 was to allow DSL users
>>> control over
>>>>>>>>>>>>> repartitioning and parallelism of sub-topologies by:
>>>>>>>>>>>>> 1) Introducing new KStream#repartition operation which is done
>>> in
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>
>>>>>>>>>>>>> 2) Add new KStream#groupBy(Repartitioned) operation, which is
>>> planned to
>>>>>>>>>>>>> be separate PR.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> While all agree about general implementation and idea behind
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> PR, introducing new
>>>>>>>>>>>>> KStream#groupBy(Repartitioned) method overload raised some
>>> questions during
>>>>>>>>>>>>> the review.
>>>>>>>>>>>>> Matthias raised concern that there can be cases when user uses
>>>>>>>>>>>>> `KStream#groupBy(Repartitioned)` operation, but actual
>>> repartitioning may
>>>>>>>>>>>>> not required, thus configuration passed via `Repartitioned`
>>> would never be
>>>>>>>>>>>>> applied (Matthias, please correct me if I misinterpreted your
>>> comment).
>>>>>>>>>>>>> So instead, if user wants to control parallelism of
>>> sub-topologies, he or
>>>>>>>>>>>>> she should always use `KStream#repartition` operation before
>>> groupBy. Full
>>>>>>>>>>>>> comment can be seen here:
>>>>>>>>>>>>> 
>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 <
>>>>>>>>>>>>> 
>>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125>
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On the same topic, John pointed out that, from API design
>>> perspective, we
>>>>>>>>>>>>> shouldn’t intertwine configuration classes of different
>>> operators between
>>>>>>>>>>>>> one another. So instead of introducing new
>>> `KStream#groupBy(Repartitioned)`
>>>>>>>>>>>>> for specifying number of partitions for internal topic, we
>>> should update
>>>>>>>>>>>>> existing `Grouped` class with `numberOfPartitions` field.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Personally, I think Matthias’s concern is valid, but on the
>>> other hand
>>>>>>>>>>>>> Kafka Streams has already optimizer in place which alters
>>> topology
>>>>>>>>>>>>> independently from user. So maybe it makes sense if Kafka
>>> Streams,
>>>>>>>>>>>>> internally would optimize topology in the best way possible,
>>> even if in
>>>>>>>>>>>>> some cases this means ignoring some operator configurations
>>> passed by the
>>>>>>>>>>>>> user. Also, I agree with John about API design semantics. If we
>>> go through
>>>>>>>>>>>>> with the changes for `KStream#groupBy` operation, it makes more
>>> sense to
>>>>>>>>>>>>> add `numberOfPartitions` field to `Grouped` class instead of
>>> introducing
>>>>>>>>>>>>> new `KStream#groupBy(Repartitioned)` method overload.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I would really appreciate communities feedback on this.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman <
>>> sop...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think people are busy with the upcoming 2.4 release, and
>>> don't have
>>>>>>>>>>>>> much
>>>>>>>>>>>>>> spare time at the
>>>>>>>>>>>>>> moment. It's kind of a difficult time to get attention on
>>> things, but
>>>>>>>>>>>>> feel
>>>>>>>>>>>>>> free to pick up something else
>>>>>>>>>>>>>> to work on in the meantime until things have calmed down a bit!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze <
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Sorry for bringing this thread again, but I would like to get
>>> some
>>>>>>>>>>>>>>> attention on this PR:
>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170> <
>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170 <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/7170>>
>>>>>>>>>>>>>>> It's been a while now and I would love to move on to other
>>> KIPs as well.
>>>>>>>>>>>>>>> Please let me know if you have any concerns.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze <
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Here’s voting thread for this KIP:
>>>>>>>>>>>>>>> 
>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>
>>> <
>>>>>>>>>>>>>>> 
>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html <
>>>>>>>>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html
>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze <
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>
>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for the suggestion. I Don’t have strong opinion on
>>> that one.
>>>>>>>>>>>>>>>>> Agree that avoiding unnecessary method overloads is a good
>>> idea.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Updated KIP
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax <
>>> matth...@confluent.io
>>>>>>>>>>>>> <mailto:matth...@confluent.io>
>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>>
>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> One question:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Why do we add
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Repartitioned#with(final String name, final int
>>> numberOfPartitions)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> It seems that `#with(String name)`,
>>> `#numberOfPartitions(int)` in
>>>>>>>>>>>>>>>>>> combination with `withName()` and
>>> `withNumberOfPartitions()` should
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> sufficient. Users can chain the method calls.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> (I think it's valuable to keep the number of overload
>>> small if
>>>>>>>>>>>>>>> possible.)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Otherwise LGTM.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks all for your feedback.
>>>>>>>>>>>>>>>>>>> I started voting procedure for this KIP. If there’re any
>>> other
>>>>>>>>>>>>>>> concerns about this KIP, please let me know.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze <
>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, makes sense.
>>>>>>>>>>>>>>>>>>>> I’ve updated KIP (
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax <
>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>>>>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>
>>> <mailto:
>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io> <mailto:
>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks for driving the KIP.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I agree that users need to be able to specify a
>>> partitioning
>>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Sophie raises a fair point about topic configs and
>>> producer
>>>>>>>>>>>>>>> configs. My
>>>>>>>>>>>>>>>>>>>>> take is, that consider `Repartitioned` as an
>>> "extension" to
>>>>>>>>>>>>>>> `Produced`,
>>>>>>>>>>>>>>>>>>>>> that adds topic configuration, is a good way to think
>>> about it and
>>>>>>>>>>>>>>> helps
>>>>>>>>>>>>>>>>>>>>> to keep the API "clean".
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> With regard to method names. I would prefer to avoid
>>>>>>>>>>>>> abbreviations.
>>>>>>>>>>>>>>> Can
>>>>>>>>>>>>>>>>>>>>> we rename:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Furthermore, it might be good to add some more `static`
>>> methods:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> - Repartitioned.with(Serde<K>, Serde<V>)
>>>>>>>>>>>>>>>>>>>>> - Repartitioned.withNumberOfPartitions(int)
>>>>>>>>>>>>>>>>>>>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>>>>>>>>> Totally agree. I think in KStream interface it makes
>>> sense to
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> some duplicate configurations between operators in order to
>>> keep API
>>>>>>>>>>>>> simple
>>>>>>>>>>>>>>> and usable.
>>>>>>>>>>>>>>>>>>>>>> Also, as more surface API has, harder it is to have
>>> proper
>>>>>>>>>>>>>>> backward compatibility.
>>>>>>>>>>>>>>>>>>>>>> While initial idea of keeping topic level configs
>>> separate was
>>>>>>>>>>>>>>> exciting, having Repartitioned class encapsulate some
>>> producer level
>>>>>>>>>>>>>>> configs makes API more readable.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman <
>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto:
>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io> <mailto:
>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I think that is a good point about trying to keep
>>> producer level
>>>>>>>>>>>>>>>>>>>>>>> configurations and (repartition) topic level
>>> considerations
>>>>>>>>>>>>>>> separate.
>>>>>>>>>>>>>>>>>>>>>>> Number of partitions is definitely purely a topic
>>> level
>>>>>>>>>>>>>>> configuration. But
>>>>>>>>>>>>>>>>>>>>>>> on some level, serdes and partitioners are just as
>>> much a topic
>>>>>>>>>>>>>>>>>>>>>>> configuration as a producer one. You could have two
>>> producers
>>>>>>>>>>>>>>> configured
>>>>>>>>>>>>>>>>>>>>>>> with different serdes and/or partitioners, but if
>>> they are
>>>>>>>>>>>>>>> writing to the
>>>>>>>>>>>>>>>>>>>>>>> same topic the result would be very difficult to
>>> part. So in a
>>>>>>>>>>>>>>> sense, these
>>>>>>>>>>>>>>>>>>>>>>> are configurations of topics in Streams, not just
>>> producers.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Another way to think of it: while the Streams API is
>>> not always
>>>>>>>>>>>>>>> true to
>>>>>>>>>>>>>>>>>>>>>>> this, ideally all the relevant configs for an
>>> operator are
>>>>>>>>>>>>>>> wrapped into a
>>>>>>>>>>>>>>>>>>>>>>> single object (in this case, Repartitioned). We could
>>> instead
>>>>>>>>>>>>>>> split out the
>>>>>>>>>>>>>>>>>>>>>>> fields in common with Produced into a separate
>>> parameter to keep
>>>>>>>>>>>>>>> topic and
>>>>>>>>>>>>>>>>>>>>>>> producer level configurations separate, but this
>>> increases the
>>>>>>>>>>>>>>> API surface
>>>>>>>>>>>>>>>>>>>>>>> area by a lot. It's much more straightforward to just
>>> say "this
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> everything that this particular operator needs"
>>> without worrying
>>>>>>>>>>>>>>> about what
>>>>>>>>>>>>>>>>>>>>>>> exactly you're specifying.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I suppose you could alternatively make Produced a
>>> field of
>>>>>>>>>>>>>>> Repartitioned,
>>>>>>>>>>>>>>>>>>>>>>> but I don't think we do this kind of composition
>>> elsewhere in
>>>>>>>>>>>>>>> Streams at
>>>>>>>>>>>>>>>>>>>>>>> the moment
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze <
>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>> <mailto:
>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the feedback.
>>>>>>>>>>>>>>>>>>>>>>>> Yes, that makes sense. I’ve updated KIP with
>>>>>>>>>>>>>>> `Repartitioned#partitioner`
>>>>>>>>>>>>>>>>>>>>>>>> configuration.
>>>>>>>>>>>>>>>>>>>>>>>> In the beginning, I wanted to introduce a class for
>>> topic level
>>>>>>>>>>>>>>>>>>>>>>>> configuration and keep topic level and producer level
>>>>>>>>>>>>>>> configurations (such
>>>>>>>>>>>>>>>>>>>>>>>> as Produced) separately (see my second email in this
>>> thread).
>>>>>>>>>>>>>>>>>>>>>>>> But while looking at the semantics of KStream
>>> interface, I
>>>>>>>>>>>>>>> couldn’t really
>>>>>>>>>>>>>>>>>>>>>>>> figure out good operation name for Topic level
>>> configuration
>>>>>>>>>>>>>>> class and just
>>>>>>>>>>>>>>>>>>>>>>>> introducing `Topic` config class was kinda breaking
>>> the
>>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>> So I think having Repartitioned class which
>>> encapsulates topic
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> producer level configurations for internal topics is
>>> viable
>>>>>>>>>>>>>>> thing to do.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck <
>>> bbej...@gmail.com
>>>>>>>>>>>>> <mailto:bbej...@gmail.com>
>>>>>>>>>>>>>>> <mailto:bbej...@gmail.com <mailto:bbej...@gmail.com>>
>>> <mailto:
>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> <mailto:
>>>>>>>>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Lavani,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for resurrecting this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I'm also a +1 for adding a partition option.  In
>>> addition to
>>>>>>>>>>>>>>> the reason
>>>>>>>>>>>>>>>>>>>>>>>>> provided by John, my reasoning is:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 1. Users may want to use something other than
>>> hash-based
>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>>> 2. Users may wish to partition on something
>>> different than the
>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>> without having to change the key.  For example:
>>>>>>>>>>>>>>>>>>>>>>>>> 1. A combination of fields in the value in
>>> conjunction with
>>>>>>>>>>>>>>> the key
>>>>>>>>>>>>>>>>>>>>>>>>> 2. Something other than the key
>>>>>>>>>>>>>>>>>>>>>>>>> 3. We allow users to specify a partitioner on
>>> Produced hence
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> KStream.to and KStream.through, so it makes sense
>>> for API
>>>>>>>>>>>>>>> consistency.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Just my  2 cents.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>> <mailto:
>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> In my mind it makes sense.
>>>>>>>>>>>>>>>>>>>>>>>>>> If we add partitioner configuration to
>>> Repartitioned class,
>>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>>>>>>>>>>> combination of specifying number of partitions for
>>> internal
>>>>>>>>>>>>>>> topics, user
>>>>>>>>>>>>>>>>>>>>>>>>>> will have opportunity to ensure co-partitioning
>>> before join
>>>>>>>>>>>>>>> operation.
>>>>>>>>>>>>>>>>>>>>>>>>>> I think this can be quite powerful feature.
>>>>>>>>>>>>>>>>>>>>>>>>>> Wondering what others think about this?
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler <
>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>
>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>
>>> <mailto:
>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I believe that's what I had in mind. Again,
>>> not totally
>>>>>>>>>>>>>>> sure it
>>>>>>>>>>>>>>>>>>>>>>>>>>> makes sense, but I believe something similar is
>>> the
>>>>>>>>>>>>> rationale
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>> having the partitioner option in Produced.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey John,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Oh that’s interesting use-case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do I understand this correctly, in your example
>>> I would
>>>>>>>>>>>>>>> first issue
>>>>>>>>>>>>>>>>>>>>>>>>>> repartition(Repartitioned) with proper partitioner
>>> that
>>>>>>>>>>>>>>> essentially
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>> be the same as the topic I want to join with and
>>> then do the
>>>>>>>>>>>>>>>>>>>>>>>> KStream#join
>>>>>>>>>>>>>>>>>>>>>>>>>> with DSL?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler <
>>>>>>>>>>>>>>> j...@confluent.io <mailto:j...@confluent.io> <mailto:
>>> j...@confluent.io
>>>>>>>>>>>>> <mailto:j...@confluent.io>> <mailto:j...@confluent.io <mailto:
>>>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, all, just to chime in,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it might be useful to have an option to
>>> specify
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner. The case I have in mind is that
>>> some data may
>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned and then joined with an input
>>> topic. If the
>>>>>>>>>>>>>>> right-side
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input topic uses a custom partitioning
>>> strategy, then the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioned stream also needs to be
>>> partitioned with the
>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense, or did I maybe miss
>>> something
>>>>>>>>>>>>>>> important?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani
>>> Kokhreidze
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>
>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I was thinking about it as well. To be
>>> honest I’m
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>> it yet.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Kafka Streams DSL user, I don’t really
>>> think I would
>>>>>>>>>>>>>>> need control
>>>>>>>>>>>>>>>>>>>>>>>>>> over partitioner for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a user, I would assume that Kafka Streams
>>> knows best
>>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>>>>>>>>>>>>>> partition data for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP I wrote that Produced should be
>>> used only for
>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> are created by user In advance.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In those cases maybe it make sense to have
>>> possibility to
>>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t have clear answer on that yet, but I
>>> guess
>>>>>>>>>>>>>>> specifying the
>>>>>>>>>>>>>>>>>>>>>>>>>> partitioner can be added as well if there’s
>>> agreement on
>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie
>>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>>> <mailto:
>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up. I agree that
>>> Repartitioned
>>>>>>>>>>>>>>> would be a
>>>>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addition. I'm wondering if it might also need
>>> to have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a withStreamPartitioner method/field, similar
>>> to
>>>>>>>>>>>>>>> Produced? I'm not
>>>>>>>>>>>>>>>>>>>>>>>>>> sure how
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> widely this feature is really used, but seems
>>> it should
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Sophie,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In both cases KStream#repartition and
>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition(Repartitioned)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic will be created and managed by Kafka
>>> Streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Idea of Repartitioned is to give user more
>>> control over
>>>>>>>>>>>>>>> the topic
>>>>>>>>>>>>>>>>>>>>>>>>>> such as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num of partitions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I feel like Repartitioned parameter is
>>> something that
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> missing
>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current DSL design.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Essentially giving user control over
>>> parallelism by
>>>>>>>>>>>>>>> configuring
>>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions for internal topics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers your question.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie
>>> Blee-Goldman <
>>>>>>>>>>>>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>
>>> <mailto:
>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>> <mailto:
>>>>>>>>>>>>>>> sop...@confluent.io <mailto:sop...@confluent.io>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Can you clarify one
>>> thing for me
>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition signature taking a
>>> Repartitioned,
>>>>>>>>>>>>>>> will the
>>>>>>>>>>>>>>>>>>>>>>>>>> topic be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> auto-created by Streams (which seems to be
>>> the case
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> signature
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without a Repartitioned) or does it have to
>>> be
>>>>>>>>>>>>>>> pre-created? The
>>>>>>>>>>>>>>>>>>>>>>>>>> wording
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP makes it seem like one version of
>>> the method
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>> auto-create
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics while the other will not.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>> levani.co...@gmail.com>
>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>
>>> <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more bump about KIP-221 (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>> <
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it doesn’t get lost in mailing list :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear communities
>>> opinions/concerns
>>>>>>>>>>>>> about
>>>>>>>>>>>>>>> this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind reminder about this KIP:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to move this KIP forward, I’ve
>>> updated
>>>>>>>>>>>>>>> confluence
>>>>>>>>>>>>>>>>>>>>>>>> page
>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the new proposal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I’ve also filled “Rejected Alternatives”
>>> section.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to discuss this KIP :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> King regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and ideas.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the idea of introducing
>>> dedicated `Topic`
>>>>>>>>>>>>>>> class for
>>>>>>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration for internal operators like
>>>>>>>>>>>>> `groupedBy`.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be great to hear others opinion
>>> about this
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias
>>> J. Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <mailto:matth...@confluent.io>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for picking up this KIP! And
>>> thanks for
>>>>>>>>>>>>>>> summarizing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even if some points may have been
>>> discussed
>>>>>>>>>>>>>>> already (can't
>>>>>>>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remember), it's helpful to get a good
>>> summary to
>>>>>>>>>>>>>>> refresh the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think your reasoning makes sense.
>>> With regard
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> distinction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between operators that manage topics
>>> and
>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>> that use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user-created
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics: Following this argument, it
>>> might
>>>>>>>>>>>>> indicate
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` as-is (as an operator that
>>> uses
>>>>>>>>>>>>>>> use-defined
>>>>>>>>>>>>>>>>>>>>>>>>>> topics) and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introducing a new `repartition()`
>>> operator (an
>>>>>>>>>>>>>>> operator that
>>>>>>>>>>>>>>>>>>>>>>>>>> manages
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topics itself) might be good.
>>> Otherwise, there is
>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `through()` that sometimes manages
>>> topics but
>>>>>>>>>>>>>>> sometimes
>>>>>>>>>>>>>>>>>>>>>>>> not; a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name, ie, new operator would make the
>>> distinction
>>>>>>>>>>>>>>> clearer.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About adding `numOfPartitions` to
>>> `Grouped`. I am
>>>>>>>>>>>>>>> wondering
>>>>>>>>>>>>>>>>>>>>>>>>>> if the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> argument as for `Produced` does apply
>>> and adding
>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>> semantically
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> questionable? Might be good to get
>>> opinions of
>>>>>>>>>>>>>>> others on
>>>>>>>>>>>>>>>>>>>>>>>>>> this, too.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not sure myself what solution I prefer
>>> atm.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, KS uses configuration objects
>>> that allow
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "entity" like a consumer, producer,
>>> store. If we
>>>>>>>>>>>>>>> assume that
>>>>>>>>>>>>>>>>>>>>>>>>>> a topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a similar entity, I am wonder if we
>>> should have a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class
>>> and method
>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>>>>>> a plain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integer? This would allow us to add
>>> other
>>>>>>>>>>>>> configs,
>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>> replication
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> factor, retention-time etc, easily,
>>> without the
>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>>> change the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "main
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just want to give some ideas. Not sure
>>> if I like
>>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>>>>>> myself.
>>>>>>>>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually, giving it more though -
>>> maybe
>>>>>>>>>>>>> enhancing
>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>> with num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions configuration is not the
>>> best approach.
>>>>>>>>>>>>>>> Let me
>>>>>>>>>>>>>>>>>>>>>>>>>> explain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) If we enhance Produced class with
>>> this
>>>>>>>>>>>>>>> configuration,
>>>>>>>>>>>>>>>>>>>>>>>>>> this will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also affect KStream#to operation. Since
>>> KStream#to is
>>>>>>>>>>>>>>> the final
>>>>>>>>>>>>>>>>>>>>>>>>>> sink of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> topology, for me, it seems to be reasonable
>>>>>>>>>>>>> assumption
>>>>>>>>>>>>>>> that user
>>>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manually create sink topic in advance. And
>>> in that
>>>>>>>>>>>>>>> case, having
>>>>>>>>>>>>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions configuration doesn’t make much
>>> sense.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Looking at Produced class, based
>>> on API
>>>>>>>>>>>>>>> contract, seems
>>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced is designed to be something that
>>> is
>>>>>>>>>>>>>>> explicitly for
>>>>>>>>>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serializer, value serializer, partitioner
>>> those all
>>>>>>>>>>>>>>> are producer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations) and num of partitions is
>>> topic level
>>>>>>>>>>>>>>>>>>>>>>>>>> configuration. And
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don’t think mixing topic and producer level
>>>>>>>>>>>>>>> configurations
>>>>>>>>>>>>>>>>>>>>>>>>>> together in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class is the good approach.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Looking at KStream interface,
>>> seems like
>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>> parameter is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for operations that work with non-internal
>>> (e.g
>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internally by Kafka Streams) topics and I
>>> think we
>>>>>>>>>>>>>>> should leave
>>>>>>>>>>>>>>>>>>>>>>>>>> it as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in that case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Taking all this things into account,
>>> I think we
>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>> distinguish
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between DSL operations, where Kafka
>>> Streams should
>>>>>>>>>>>>>>> create and
>>>>>>>>>>>>>>>>>>>>>>>>>> manage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal topics (KStream#groupBy) vs
>>> topics that
>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>>> created in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> advance (e.g KStream#to).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To sum it up, I think adding
>>> numPartitions
>>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Produced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will result in mixing topic and producer
>>> level
>>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and it’s gonna break existing API
>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding making topic name optional
>>> in
>>>>>>>>>>>>>>> KStream#through - I
>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underline idea is very useful and giving
>>> users
>>>>>>>>>>>>>>> possibility to
>>>>>>>>>>>>>>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> num
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of partitions there is even more useful :)
>>>>>>>>>>>>> Considering
>>>>>>>>>>>>>>> arguments
>>>>>>>>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding num of partitions in Produced
>>> class, I see two
>>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add following method overloads
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>>> auto-generated and
>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions)
>>> - topic
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions,
>>> final
>>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated
>>> with
>>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Leave KStream#through as it is and
>>> introduce
>>>>>>>>>>>>>>> new method
>>>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#repartition (I think Matthias
>>> suggested this
>>>>>>>>>>>>>>> in one of
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> threads)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering all mentioned above I
>>> propose the
>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>>>>> plan:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option A:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>>> configuration to
>>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add following method overloads to
>>>>>>>>>>>>>>> KStream#through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through() - topic will be
>>> auto-generated and
>>>>>>>>>>>>>>> num of
>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions)
>>> - topic
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> be auto
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * through(final int numOfPartitions,
>>> final
>>>>>>>>>>>>>>> Produced<K, V>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> produced) - topic will be with generated
>>> with
>>>>>>>>>>>>>>> specified num of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and configuration taken from produced
>>> parameter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Option B:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Add num of partitions
>>> configuration to
>>>>>>>>>>>>> Grouped
>>>>>>>>>>>>>>> class (as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Add new operator
>>> KStream#repartition for
>>>>>>>>>>>>>>> creating and
>>>>>>>>>>>>>>>>>>>>>>>>>> managing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal repartition topics
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was
>>> already
>>>>>>>>>>>>>>> discussed in the
>>>>>>>>>>>>>>>>>>>>>>>>>> mailing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> list, but I kinda got with all the threads
>>> that were
>>>>>>>>>>>>>>> about this
>>>>>>>>>>>>>>>>>>>>>>>>>> KIP :(
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani
>>> Kokhreidze <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> levani.co...@gmail.com <mailto:
>>>>>>>>>>>>> levani.co...@gmail.com>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to resurrect discussion
>>> around
>>>>>>>>>>>>>>> KIP-221. Going
>>>>>>>>>>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the discussion thread, there’s seems to
>>> agreement
>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>>>>>> usefulness of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the implementation, as far
>>> as I
>>>>>>>>>>>>>>> understood, the
>>>>>>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimal solution for me seems the
>>> following:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) Add two method overloads to
>>> KStream#through
>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>>>>>> (essentially
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making topic name optional)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Enhance Produced class with
>>> numOfPartitions
>>>>>>>>>>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Those two changes will allow DSL
>>> users to
>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>>>>>> parallelism and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger re-partition without doing stateful
>>>>>>>>>>>>> operations.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will update KIP with interface
>>> changes around
>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#through if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this changes sound sensible.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 
> 

Reply via email to