Hi Lavani, Thanks for resurrecting this KIP.
I'm also a +1 for adding a partition option. In addition to the reason provided by John, my reasoning is: 1. Users may want to use something other than hash-based partitioning 2. Users may wish to partition on something different than the key without having to change the key. For example: 1. A combination of fields in the value in conjunction with the key 2. Something other than the key 3. We allow users to specify a partitioner on Produced hence in KStream.to and KStream.through, so it makes sense for API consistency. Just my 2 cents. Thanks, Bill On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <levani.co...@gmail.com> wrote: > Hi John, > > In my mind it makes sense. > If we add partitioner configuration to Repartitioned class, with the > combination of specifying number of partitions for internal topics, user > will have opportunity to ensure co-partitioning before join operation. > I think this can be quite powerful feature. > Wondering what others think about this? > > Regards, > Levani > > > On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote: > > > > Yes, I believe that's what I had in mind. Again, not totally sure it > > makes sense, but I believe something similar is the rationale for > > having the partitioner option in Produced. > > > > Thanks, > > -John > > > > On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze > > <levani.co...@gmail.com> wrote: > >> > >> Hey John, > >> > >> Oh that’s interesting use-case. > >> Do I understand this correctly, in your example I would first issue > repartition(Repartitioned) with proper partitioner that essentially would > be the same as the topic I want to join with and then do the KStream#join > with DSL? > >> > >> Regards, > >> Levani > >> > >>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io> wrote: > >>> > >>> Hey, all, just to chime in, > >>> > >>> I think it might be useful to have an option to specify the > >>> partitioner. The case I have in mind is that some data may get > >>> repartitioned and then joined with an input topic. If the right-side > >>> input topic uses a custom partitioning strategy, then the > >>> repartitioned stream also needs to be partitioned with the same > >>> strategy. > >>> > >>> Does that make sense, or did I maybe miss something important? > >>> > >>> Thanks, > >>> -John > >>> > >>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze > >>> <levani.co...@gmail.com> wrote: > >>>> > >>>> Yes, I was thinking about it as well. To be honest I’m not sure about > it yet. > >>>> As Kafka Streams DSL user, I don’t really think I would need control > over partitioner for internal topics. > >>>> As a user, I would assume that Kafka Streams knows best how to > partition data for internal topics. > >>>> In this KIP I wrote that Produced should be used only for topics that > are created by user In advance. > >>>> In those cases maybe it make sense to have possibility to specify the > partitioner. > >>>> I don’t have clear answer on that yet, but I guess specifying the > partitioner can be added as well if there’s agreement on this. > >>>> > >>>> Regards, > >>>> Levani > >>>> > >>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman < > sop...@confluent.io> wrote: > >>>>> > >>>>> Thanks for clearing that up. I agree that Repartitioned would be a > useful > >>>>> addition. I'm wondering if it might also need to have > >>>>> a withStreamPartitioner method/field, similar to Produced? I'm not > sure how > >>>>> widely this feature is really used, but seems it should be available > for > >>>>> repartition topics. > >>>>> > >>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze < > levani.co...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hey Sophie, > >>>>>> > >>>>>> In both cases KStream#repartition and > KStream#repartition(Repartitioned) > >>>>>> topic will be created and managed by Kafka Streams. > >>>>>> Idea of Repartitioned is to give user more control over the topic > such as > >>>>>> num of partitions. > >>>>>> I feel like Repartitioned parameter is something that is missing in > >>>>>> current DSL design. > >>>>>> Essentially giving user control over parallelism by configuring num > of > >>>>>> partitions for internal topics. > >>>>>> > >>>>>> Hope this answers your question. > >>>>>> > >>>>>> Regards, > >>>>>> Levani > >>>>>> > >>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman < > sop...@confluent.io> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hey Levani, > >>>>>>> > >>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the > >>>>>>> KStream#repartition signature taking a Repartitioned, will the > topic be > >>>>>>> auto-created by Streams (which seems to be the case for the > signature > >>>>>>> without a Repartitioned) or does it have to be pre-created? The > wording > >>>>>> in > >>>>>>> the KIP makes it seem like one version of the method will > auto-create > >>>>>>> topics while the other will not. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Sophie > >>>>>>> > >>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze < > >>>>>> levani.co...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hello, > >>>>>>>> > >>>>>>>> One more bump about KIP-221 ( > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>> < > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>> ) > >>>>>>>> so it doesn’t get lost in mailing list :) > >>>>>>>> Would love to hear communities opinions/concerns about this KIP. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Levani > >>>>>>>> > >>>>>>>> > >>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze < > levani.co...@gmail.com > >>>>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hello, > >>>>>>>>> > >>>>>>>>> Kind reminder about this KIP: > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>> < > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Levani > >>>>>>>>> > >>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze < > >>>>>> levani.co...@gmail.com > >>>>>>>> <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hello, > >>>>>>>>>> > >>>>>>>>>> In order to move this KIP forward, I’ve updated confluence page > with > >>>>>>>> the new proposal > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>> < > >>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>> > >>>>>>>>>> I’ve also filled “Rejected Alternatives” section. > >>>>>>>>>> > >>>>>>>>>> Looking forward to discuss this KIP :) > >>>>>>>>>> > >>>>>>>>>> King regards, > >>>>>>>>>> Levani > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze < > >>>>>> levani.co...@gmail.com > >>>>>>>> <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hello Matthias, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the feedback and ideas. > >>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for > topic > >>>>>>>> configuration for internal operators like `groupedBy`. > >>>>>>>>>>> Would be great to hear others opinion about this as well. > >>>>>>>>>>> > >>>>>>>>>>> Kind regards, > >>>>>>>>>>> Levani > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax < > matth...@confluent.io > >>>>>>>> <mailto:matth...@confluent.io>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Levani, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing > >>>>>> everything. > >>>>>>>>>>>> Even if some points may have been discussed already (can't > really > >>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the > >>>>>>>> discussion. > >>>>>>>>>>>> > >>>>>>>>>>>> I think your reasoning makes sense. With regard to the > distinction > >>>>>>>>>>>> between operators that manage topics and operators that use > >>>>>>>> user-created > >>>>>>>>>>>> topics: Following this argument, it might indicate that > leaving > >>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined > topics) and > >>>>>>>>>>>> introducing a new `repartition()` operator (an operator that > manages > >>>>>>>>>>>> topics itself) might be good. Otherwise, there is one operator > >>>>>>>>>>>> `through()` that sometimes manages topics but sometimes not; a > >>>>>>>> different > >>>>>>>>>>>> name, ie, new operator would make the distinction clearer. > >>>>>>>>>>>> > >>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering > if the > >>>>>>>> same > >>>>>>>>>>>> argument as for `Produced` does apply and adding it is > semantically > >>>>>>>>>>>> questionable? Might be good to get opinions of others on > this, too. > >>>>>> I > >>>>>>>> am > >>>>>>>>>>>> not sure myself what solution I prefer atm. > >>>>>>>>>>>> > >>>>>>>>>>>> So far, KS uses configuration objects that allow to configure > a > >>>>>>>> certain > >>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that > a topic > >>>>>>>> is > >>>>>>>>>>>> a similar entity, I am wonder if we should have a > >>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of > a plain > >>>>>>>>>>>> integer? This would allow us to add other configs, like > replication > >>>>>>>>>>>> factor, retention-time etc, easily, without the need to > change the > >>>>>>>> "main > >>>>>>>>>>>> API". > >>>>>>>>>>>> > >>>>>>>>>>>> Just want to give some ideas. Not sure if I like them myself. > :) > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote: > >>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced > with num > >>>>>>>> of partitions configuration is not the best approach. Let me > explain > >>>>>> why: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) If we enhance Produced class with this configuration, > this will > >>>>>>>> also affect KStream#to operation. Since KStream#to is the final > sink of > >>>>>> the > >>>>>>>> topology, for me, it seems to be reasonable assumption that user > needs > >>>>>> to > >>>>>>>> manually create sink topic in advance. And in that case, having > num of > >>>>>>>> partitions configuration doesn’t make much sense. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems > like > >>>>>>>> Produced is designed to be something that is explicitly for > producer > >>>>>> (key > >>>>>>>> serializer, value serializer, partitioner those all are producer > >>>>>> specific > >>>>>>>> configurations) and num of partitions is topic level > configuration. And > >>>>>> I > >>>>>>>> don’t think mixing topic and producer level configurations > together in > >>>>>> one > >>>>>>>> class is the good approach. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced > parameter is > >>>>>>>> for operations that work with non-internal (e.g topics created and > >>>>>> managed > >>>>>>>> internally by Kafka Streams) topics and I think we should leave > it as > >>>>>> it is > >>>>>>>> in that case. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Taking all this things into account, I think we should > distinguish > >>>>>>>> between DSL operations, where Kafka Streams should create and > manage > >>>>>>>> internal topics (KStream#groupBy) vs topics that should be > created in > >>>>>>>> advance (e.g KStream#to). > >>>>>>>>>>>>> > >>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in > >>>>>> Produced > >>>>>>>> will result in mixing topic and producer level configuration in > one > >>>>>> class > >>>>>>>> and it’s gonna break existing API semantics. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I > think > >>>>>>>> underline idea is very useful and giving users possibility to > specify > >>>>>> num > >>>>>>>> of partitions there is even more useful :) Considering arguments > against > >>>>>>>> adding num of partitions in Produced class, I see two options > here: > >>>>>>>>>>>>> 1) Add following method overloads > >>>>>>>>>>>>> * through() - topic will be auto-generated and num of > partitions > >>>>>>>> will be taken from source topic > >>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>>>> generated with specified num of partitions > >>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V> > >>>>>>>> produced) - topic will be with generated with specified num of > >>>>>> partitions > >>>>>>>> and configuration taken from produced parameter. > >>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method - > >>>>>>>> KStream#repartition (I think Matthias suggested this in one of the > >>>>>> threads) > >>>>>>>>>>>>> > >>>>>>>>>>>>> Considering all mentioned above I propose the following plan: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Option A: > >>>>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>>>> mentioned in the KIP) > >>>>>>>>>>>>> 3) Add following method overloads to KStream#through > >>>>>>>>>>>>> * through() - topic will be auto-generated and num of > partitions > >>>>>>>> will be taken from source topic > >>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>>>> generated with specified num of partitions > >>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V> > >>>>>>>> produced) - topic will be with generated with specified num of > >>>>>> partitions > >>>>>>>> and configuration taken from produced parameter. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Option B: > >>>>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>>>> mentioned in the KIP) > >>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and > managing > >>>>>>>> internal repartition topics > >>>>>>>>>>>>> > >>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the > mailing > >>>>>>>> list, but I kinda got with all the threads that were about this > KIP :( > >>>>>>>>>>>>> > >>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>> Levani > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze < > >>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going > through > >>>>>>>> the discussion thread, there’s seems to agreement around > usefulness of > >>>>>> this > >>>>>>>> feature. > >>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the > most > >>>>>>>> optimal solution for me seems the following: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method > (essentially > >>>>>>>> making topic name optional) > >>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions configuration > >>>>>> field. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Those two changes will allow DSL users to control > parallelism and > >>>>>>>> trigger re-partition without doing stateful operations. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I will update KIP with interface changes around > KStream#through if > >>>>>>>> this changes sound sensible. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>>> Levani > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >> > >