Hi John,

In my mind it makes sense. 
If we add partitioner configuration to Repartitioned class, with the 
combination of specifying number of partitions for internal topics, user will 
have opportunity to ensure co-partitioning before join operation. 
I think this can be quite powerful feature.
Wondering what others think about this?

Regards,
Levani

> On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote:
> 
> Yes, I believe that's what I had in mind. Again, not totally sure it
> makes sense, but I believe something similar is the rationale for
> having the partitioner option in Produced.
> 
> Thanks,
> -John
> 
> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
> <levani.co...@gmail.com> wrote:
>> 
>> Hey John,
>> 
>> Oh that’s interesting use-case.
>> Do I understand this correctly, in your example I would first issue 
>> repartition(Repartitioned) with proper partitioner that essentially would be 
>> the same as the topic I want to join with and then do the KStream#join with 
>> DSL?
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io> wrote:
>>> 
>>> Hey, all, just to chime in,
>>> 
>>> I think it might be useful to have an option to specify the
>>> partitioner. The case I have in mind is that some data may get
>>> repartitioned and then joined with an input topic. If the right-side
>>> input topic uses a custom partitioning strategy, then the
>>> repartitioned stream also needs to be partitioned with the same
>>> strategy.
>>> 
>>> Does that make sense, or did I maybe miss something important?
>>> 
>>> Thanks,
>>> -John
>>> 
>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
>>> <levani.co...@gmail.com> wrote:
>>>> 
>>>> Yes, I was thinking about it as well. To be honest I’m not sure about it 
>>>> yet.
>>>> As Kafka Streams DSL user, I don’t really think I would need control over 
>>>> partitioner for internal topics.
>>>> As a user, I would assume that Kafka Streams knows best how to partition 
>>>> data for internal topics.
>>>> In this KIP I wrote that Produced should be used only for topics that are 
>>>> created by user In advance.
>>>> In those cases maybe it make sense to have possibility to specify the 
>>>> partitioner.
>>>> I don’t have clear answer on that yet, but I guess specifying the 
>>>> partitioner can be added as well if there’s agreement on this.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman <sop...@confluent.io> 
>>>>> wrote:
>>>>> 
>>>>> Thanks for clearing that up. I agree that Repartitioned would be a useful
>>>>> addition. I'm wondering if it might also need to have
>>>>> a withStreamPartitioner method/field, similar to Produced? I'm not sure 
>>>>> how
>>>>> widely this feature is really used, but seems it should be available for
>>>>> repartition topics.
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze 
>>>>> <levani.co...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hey Sophie,
>>>>>> 
>>>>>> In both cases KStream#repartition and KStream#repartition(Repartitioned)
>>>>>> topic will be created and managed by Kafka Streams.
>>>>>> Idea of Repartitioned is to give user more control over the topic such as
>>>>>> num of partitions.
>>>>>> I feel like Repartitioned parameter is something that is missing in
>>>>>> current DSL design.
>>>>>> Essentially giving user control over parallelism by configuring num of
>>>>>> partitions for internal topics.
>>>>>> 
>>>>>> Hope this answers your question.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman <sop...@confluent.io>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hey Levani,
>>>>>>> 
>>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
>>>>>>> KStream#repartition signature taking a Repartitioned, will the topic be
>>>>>>> auto-created by Streams (which seems to be the case for the signature
>>>>>>> without a Repartitioned) or does it have to be pre-created? The wording
>>>>>> in
>>>>>>> the KIP makes it seem like one version of the method will auto-create
>>>>>>> topics while the other will not.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Sophie
>>>>>>> 
>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
>>>>>> levani.co...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> One more bump about KIP-221 (
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>> <
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>> )
>>>>>>>> so it doesn’t get lost in mailing list :)
>>>>>>>> Would love to hear communities opinions/concerns about this KIP.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Levani
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze <levani.co...@gmail.com
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> Kind reminder about this KIP:
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>> <
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Levani
>>>>>>>>> 
>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze <
>>>>>> levani.co...@gmail.com
>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello,
>>>>>>>>>> 
>>>>>>>>>> In order to move this KIP forward, I’ve updated confluence page with
>>>>>>>> the new proposal
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>> <
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> 
>>>>>>>>>> I’ve also filled “Rejected Alternatives” section.
>>>>>>>>>> 
>>>>>>>>>> Looking forward to discuss this KIP :)
>>>>>>>>>> 
>>>>>>>>>> King regards,
>>>>>>>>>> Levani
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze <
>>>>>> levani.co...@gmail.com
>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the feedback and ideas.
>>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for topic
>>>>>>>> configuration for internal operators like `groupedBy`.
>>>>>>>>>>> Would be great to hear others opinion about this as well.
>>>>>>>>>>> 
>>>>>>>>>>> Kind regards,
>>>>>>>>>>> Levani
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax <matth...@confluent.io
>>>>>>>> <mailto:matth...@confluent.io>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Levani,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing
>>>>>> everything.
>>>>>>>>>>>> Even if some points may have been discussed already (can't really
>>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the
>>>>>>>> discussion.
>>>>>>>>>>>> 
>>>>>>>>>>>> I think your reasoning makes sense. With regard to the distinction
>>>>>>>>>>>> between operators that manage topics and operators that use
>>>>>>>> user-created
>>>>>>>>>>>> topics: Following this argument, it might indicate that leaving
>>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined topics) and
>>>>>>>>>>>> introducing a new `repartition()` operator (an operator that 
>>>>>>>>>>>> manages
>>>>>>>>>>>> topics itself) might be good. Otherwise, there is one operator
>>>>>>>>>>>> `through()` that sometimes manages topics but sometimes not; a
>>>>>>>> different
>>>>>>>>>>>> name, ie, new operator would make the distinction clearer.
>>>>>>>>>>>> 
>>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering if the
>>>>>>>> same
>>>>>>>>>>>> argument as for `Produced` does apply and adding it is semantically
>>>>>>>>>>>> questionable? Might be good to get opinions of others on this, too.
>>>>>> I
>>>>>>>> am
>>>>>>>>>>>> not sure myself what solution I prefer atm.
>>>>>>>>>>>> 
>>>>>>>>>>>> So far, KS uses configuration objects that allow to configure a
>>>>>>>> certain
>>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that a 
>>>>>>>>>>>> topic
>>>>>>>> is
>>>>>>>>>>>> a similar entity, I am wonder if we should have a
>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of a 
>>>>>>>>>>>> plain
>>>>>>>>>>>> integer? This would allow us to add other configs, like replication
>>>>>>>>>>>> factor, retention-time etc, easily, without the need to change the
>>>>>>>> "main
>>>>>>>>>>>> API".
>>>>>>>>>>>> 
>>>>>>>>>>>> Just want to give some ideas. Not sure if I like them myself. :)
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced with 
>>>>>>>>>>>>> num
>>>>>>>> of partitions configuration is not the best approach. Let me explain
>>>>>> why:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1) If we enhance Produced class with this configuration, this will
>>>>>>>> also affect KStream#to operation. Since KStream#to is the final sink of
>>>>>> the
>>>>>>>> topology, for me, it seems to be reasonable assumption that user needs
>>>>>> to
>>>>>>>> manually create sink topic in advance. And in that case, having num of
>>>>>>>> partitions configuration doesn’t make much sense.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems like
>>>>>>>> Produced is designed to be something that is explicitly for producer
>>>>>> (key
>>>>>>>> serializer, value serializer, partitioner those all are producer
>>>>>> specific
>>>>>>>> configurations) and num of partitions is topic level configuration. And
>>>>>> I
>>>>>>>> don’t think mixing topic and producer level configurations together in
>>>>>> one
>>>>>>>> class is the good approach.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced parameter is
>>>>>>>> for operations that work with non-internal (e.g topics created and
>>>>>> managed
>>>>>>>> internally by Kafka Streams) topics and I think we should leave it as
>>>>>> it is
>>>>>>>> in that case.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Taking all this things into account, I think we should distinguish
>>>>>>>> between DSL operations, where Kafka Streams should create and manage
>>>>>>>> internal topics (KStream#groupBy) vs topics that should be created in
>>>>>>>> advance (e.g KStream#to).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in
>>>>>> Produced
>>>>>>>> will result in mixing topic and producer level configuration in one
>>>>>> class
>>>>>>>> and it’s gonna break existing API semantics.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I think
>>>>>>>> underline idea is very useful and giving users possibility to specify
>>>>>> num
>>>>>>>> of partitions there is even more useful :) Considering arguments 
>>>>>>>> against
>>>>>>>> adding num of partitions in Produced class, I see two options here:
>>>>>>>>>>>>> 1) Add following method overloads
>>>>>>>>>>>>> * through() - topic will be auto-generated and num of partitions
>>>>>>>> will be taken from source topic
>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
>>>>>>>> produced) - topic will be with generated with specified num of
>>>>>> partitions
>>>>>>>> and configuration taken from produced parameter.
>>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method -
>>>>>>>> KStream#repartition (I think Matthias suggested this in one of the
>>>>>> threads)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Considering all mentioned above I propose the following plan:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Option A:
>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>> 3) Add following method overloads to KStream#through
>>>>>>>>>>>>> * through() - topic will be auto-generated and num of partitions
>>>>>>>> will be taken from source topic
>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
>>>>>>>> produced) - topic will be with generated with specified num of
>>>>>> partitions
>>>>>>>> and configuration taken from produced parameter.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Option B:
>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and managing
>>>>>>>> internal repartition topics
>>>>>>>>>>>>> 
>>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the mailing
>>>>>>>> list, but I kinda got with all the threads that were about this KIP :(
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze <
>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going 
>>>>>>>>>>>>>> through
>>>>>>>> the discussion thread, there’s seems to agreement around usefulness of
>>>>>> this
>>>>>>>> feature.
>>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the most
>>>>>>>> optimal solution for me seems the following:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method 
>>>>>>>>>>>>>> (essentially
>>>>>>>> making topic name optional)
>>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions configuration
>>>>>> field.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Those two changes will allow DSL users to control parallelism and
>>>>>>>> trigger re-partition without doing stateful operations.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I will update KIP with interface changes around KStream#through 
>>>>>>>>>>>>>> if
>>>>>>>> this changes sound sensible.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to