Hi Bill,

Thanks a lot for the feedback.
Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner` 
configuration.
In the beginning, I wanted to introduce a class for topic level configuration 
and keep topic level and producer level configurations (such as Produced) 
separately (see my second email in this thread).
But while looking at the semantics of KStream interface, I couldn’t really 
figure out good operation name for Topic level configuration class and just 
introducing `Topic` config class was kinda breaking the semantics.
So I think having Repartitioned class which encapsulates topic and producer 
level configurations for internal topics is viable thing to do.

Regards,
Levani

> On Jul 19, 2019, at 7:47 PM, Bill Bejeck <bbej...@gmail.com> wrote:
> 
> Hi Lavani,
> 
> Thanks for resurrecting this KIP.
> 
> I'm also a +1 for adding a partition option.  In addition to the reason
> provided by John, my reasoning is:
> 
>   1. Users may want to use something other than hash-based partitioning
>   2. Users may wish to partition on something different than the key
>   without having to change the key.  For example:
>      1. A combination of fields in the value in conjunction with the key
>      2. Something other than the key
>   3. We allow users to specify a partitioner on Produced hence in
>   KStream.to and KStream.through, so it makes sense for API consistency.
> 
> Just my  2 cents.
> 
> Thanks,
> Bill
> 
> 
> 
> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <levani.co...@gmail.com>
> wrote:
> 
>> Hi John,
>> 
>> In my mind it makes sense.
>> If we add partitioner configuration to Repartitioned class, with the
>> combination of specifying number of partitions for internal topics, user
>> will have opportunity to ensure co-partitioning before join operation.
>> I think this can be quite powerful feature.
>> Wondering what others think about this?
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote:
>>> 
>>> Yes, I believe that's what I had in mind. Again, not totally sure it
>>> makes sense, but I believe something similar is the rationale for
>>> having the partitioner option in Produced.
>>> 
>>> Thanks,
>>> -John
>>> 
>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
>>> <levani.co...@gmail.com> wrote:
>>>> 
>>>> Hey John,
>>>> 
>>>> Oh that’s interesting use-case.
>>>> Do I understand this correctly, in your example I would first issue
>> repartition(Repartitioned) with proper partitioner that essentially would
>> be the same as the topic I want to join with and then do the KStream#join
>> with DSL?
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io> wrote:
>>>>> 
>>>>> Hey, all, just to chime in,
>>>>> 
>>>>> I think it might be useful to have an option to specify the
>>>>> partitioner. The case I have in mind is that some data may get
>>>>> repartitioned and then joined with an input topic. If the right-side
>>>>> input topic uses a custom partitioning strategy, then the
>>>>> repartitioned stream also needs to be partitioned with the same
>>>>> strategy.
>>>>> 
>>>>> Does that make sense, or did I maybe miss something important?
>>>>> 
>>>>> Thanks,
>>>>> -John
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
>>>>> <levani.co...@gmail.com> wrote:
>>>>>> 
>>>>>> Yes, I was thinking about it as well. To be honest I’m not sure about
>> it yet.
>>>>>> As Kafka Streams DSL user, I don’t really think I would need control
>> over partitioner for internal topics.
>>>>>> As a user, I would assume that Kafka Streams knows best how to
>> partition data for internal topics.
>>>>>> In this KIP I wrote that Produced should be used only for topics that
>> are created by user In advance.
>>>>>> In those cases maybe it make sense to have possibility to specify the
>> partitioner.
>>>>>> I don’t have clear answer on that yet, but I guess specifying the
>> partitioner can be added as well if there’s agreement on this.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman <
>> sop...@confluent.io> wrote:
>>>>>>> 
>>>>>>> Thanks for clearing that up. I agree that Repartitioned would be a
>> useful
>>>>>>> addition. I'm wondering if it might also need to have
>>>>>>> a withStreamPartitioner method/field, similar to Produced? I'm not
>> sure how
>>>>>>> widely this feature is really used, but seems it should be available
>> for
>>>>>>> repartition topics.
>>>>>>> 
>>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hey Sophie,
>>>>>>>> 
>>>>>>>> In both cases KStream#repartition and
>> KStream#repartition(Repartitioned)
>>>>>>>> topic will be created and managed by Kafka Streams.
>>>>>>>> Idea of Repartitioned is to give user more control over the topic
>> such as
>>>>>>>> num of partitions.
>>>>>>>> I feel like Repartitioned parameter is something that is missing in
>>>>>>>> current DSL design.
>>>>>>>> Essentially giving user control over parallelism by configuring num
>> of
>>>>>>>> partitions for internal topics.
>>>>>>>> 
>>>>>>>> Hope this answers your question.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Levani
>>>>>>>> 
>>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman <
>> sop...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hey Levani,
>>>>>>>>> 
>>>>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
>>>>>>>>> KStream#repartition signature taking a Repartitioned, will the
>> topic be
>>>>>>>>> auto-created by Streams (which seems to be the case for the
>> signature
>>>>>>>>> without a Repartitioned) or does it have to be pre-created? The
>> wording
>>>>>>>> in
>>>>>>>>> the KIP makes it seem like one version of the method will
>> auto-create
>>>>>>>>> topics while the other will not.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Sophie
>>>>>>>>> 
>>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
>>>>>>>> levani.co...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello,
>>>>>>>>>> 
>>>>>>>>>> One more bump about KIP-221 (
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> )
>>>>>>>>>> so it doesn’t get lost in mailing list :)
>>>>>>>>>> Would love to hear communities opinions/concerns about this KIP.
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> Levani
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze <
>> levani.co...@gmail.com
>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hello,
>>>>>>>>>>> 
>>>>>>>>>>> Kind reminder about this KIP:
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Regards,
>>>>>>>>>>> Levani
>>>>>>>>>>> 
>>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze <
>>>>>>>> levani.co...@gmail.com
>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hello,
>>>>>>>>>>>> 
>>>>>>>>>>>> In order to move this KIP forward, I’ve updated confluence page
>> with
>>>>>>>>>> the new proposal
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>>> 
>>>>>>>>>>>> I’ve also filled “Rejected Alternatives” section.
>>>>>>>>>>>> 
>>>>>>>>>>>> Looking forward to discuss this KIP :)
>>>>>>>>>>>> 
>>>>>>>>>>>> King regards,
>>>>>>>>>>>> Levani
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze <
>>>>>>>> levani.co...@gmail.com
>>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the feedback and ideas.
>>>>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for
>> topic
>>>>>>>>>> configuration for internal operators like `groupedBy`.
>>>>>>>>>>>>> Would be great to hear others opinion about this as well.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax <
>> matth...@confluent.io
>>>>>>>>>> <mailto:matth...@confluent.io>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Levani,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing
>>>>>>>> everything.
>>>>>>>>>>>>>> Even if some points may have been discussed already (can't
>> really
>>>>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the
>>>>>>>>>> discussion.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think your reasoning makes sense. With regard to the
>> distinction
>>>>>>>>>>>>>> between operators that manage topics and operators that use
>>>>>>>>>> user-created
>>>>>>>>>>>>>> topics: Following this argument, it might indicate that
>> leaving
>>>>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined
>> topics) and
>>>>>>>>>>>>>> introducing a new `repartition()` operator (an operator that
>> manages
>>>>>>>>>>>>>> topics itself) might be good. Otherwise, there is one operator
>>>>>>>>>>>>>> `through()` that sometimes manages topics but sometimes not; a
>>>>>>>>>> different
>>>>>>>>>>>>>> name, ie, new operator would make the distinction clearer.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering
>> if the
>>>>>>>>>> same
>>>>>>>>>>>>>> argument as for `Produced` does apply and adding it is
>> semantically
>>>>>>>>>>>>>> questionable? Might be good to get opinions of others on
>> this, too.
>>>>>>>> I
>>>>>>>>>> am
>>>>>>>>>>>>>> not sure myself what solution I prefer atm.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> So far, KS uses configuration objects that allow to configure
>> a
>>>>>>>>>> certain
>>>>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that
>> a topic
>>>>>>>>>> is
>>>>>>>>>>>>>> a similar entity, I am wonder if we should have a
>>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of
>> a plain
>>>>>>>>>>>>>> integer? This would allow us to add other configs, like
>> replication
>>>>>>>>>>>>>> factor, retention-time etc, easily, without the need to
>> change the
>>>>>>>>>> "main
>>>>>>>>>>>>>> API".
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Just want to give some ideas. Not sure if I like them myself.
>> :)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced
>> with num
>>>>>>>>>> of partitions configuration is not the best approach. Let me
>> explain
>>>>>>>> why:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1) If we enhance Produced class with this configuration,
>> this will
>>>>>>>>>> also affect KStream#to operation. Since KStream#to is the final
>> sink of
>>>>>>>> the
>>>>>>>>>> topology, for me, it seems to be reasonable assumption that user
>> needs
>>>>>>>> to
>>>>>>>>>> manually create sink topic in advance. And in that case, having
>> num of
>>>>>>>>>> partitions configuration doesn’t make much sense.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems
>> like
>>>>>>>>>> Produced is designed to be something that is explicitly for
>> producer
>>>>>>>> (key
>>>>>>>>>> serializer, value serializer, partitioner those all are producer
>>>>>>>> specific
>>>>>>>>>> configurations) and num of partitions is topic level
>> configuration. And
>>>>>>>> I
>>>>>>>>>> don’t think mixing topic and producer level configurations
>> together in
>>>>>>>> one
>>>>>>>>>> class is the good approach.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced
>> parameter is
>>>>>>>>>> for operations that work with non-internal (e.g topics created and
>>>>>>>> managed
>>>>>>>>>> internally by Kafka Streams) topics and I think we should leave
>> it as
>>>>>>>> it is
>>>>>>>>>> in that case.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Taking all this things into account, I think we should
>> distinguish
>>>>>>>>>> between DSL operations, where Kafka Streams should create and
>> manage
>>>>>>>>>> internal topics (KStream#groupBy) vs topics that should be
>> created in
>>>>>>>>>> advance (e.g KStream#to).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in
>>>>>>>> Produced
>>>>>>>>>> will result in mixing topic and producer level configuration in
>> one
>>>>>>>> class
>>>>>>>>>> and it’s gonna break existing API semantics.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I
>> think
>>>>>>>>>> underline idea is very useful and giving users possibility to
>> specify
>>>>>>>> num
>>>>>>>>>> of partitions there is even more useful :) Considering arguments
>> against
>>>>>>>>>> adding num of partitions in Produced class, I see two options
>> here:
>>>>>>>>>>>>>>> 1) Add following method overloads
>>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of
>> partitions
>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
>>>>>>>>>> produced) - topic will be with generated with specified num of
>>>>>>>> partitions
>>>>>>>>>> and configuration taken from produced parameter.
>>>>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method -
>>>>>>>>>> KStream#repartition (I think Matthias suggested this in one of the
>>>>>>>> threads)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Considering all mentioned above I propose the following plan:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Option A:
>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>> 3) Add following method overloads to KStream#through
>>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of
>> partitions
>>>>>>>>>> will be taken from source topic
>>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
>>>>>>>>>> generated with specified num of partitions
>>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
>>>>>>>>>> produced) - topic will be with generated with specified num of
>>>>>>>> partitions
>>>>>>>>>> and configuration taken from produced parameter.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Option B:
>>>>>>>>>>>>>>> 1) Leave Produced as it is
>>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
>>>>>>>>>> mentioned in the KIP)
>>>>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and
>> managing
>>>>>>>>>> internal repartition topics
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the
>> mailing
>>>>>>>>>> list, but I kinda got with all the threads that were about this
>> KIP :(
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze <
>>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going
>> through
>>>>>>>>>> the discussion thread, there’s seems to agreement around
>> usefulness of
>>>>>>>> this
>>>>>>>>>> feature.
>>>>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the
>> most
>>>>>>>>>> optimal solution for me seems the following:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method
>> (essentially
>>>>>>>>>> making topic name optional)
>>>>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions configuration
>>>>>>>> field.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Those two changes will allow DSL users to control
>> parallelism and
>>>>>>>>>> trigger re-partition without doing stateful operations.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I will update KIP with interface changes around
>> KStream#through if
>>>>>>>>>> this changes sound sensible.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>>> Levani
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> 
>> 

Reply via email to