Hi Lavani,

Thanks for resurrecting this KIP.

I'm also a +1 for adding a partition option.  In addition to the reason
provided by John, my reasoning is:

   1. Users may want to use something other than hash-based partitioning
   2. Users may wish to partition on something different than the key
   without having to change the key.  For example:
      1. A combination of fields in the value in conjunction with the key
      2. Something other than the key
   3. We allow users to specify a partitioner on Produced hence in
   KStream.to and KStream.through, so it makes sense for API consistency.

Just my  2 cents.

Thanks,
Bill



On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <levani.co...@gmail.com>
wrote:

> Hi John,
>
> In my mind it makes sense.
> If we add partitioner configuration to Repartitioned class, with the
> combination of specifying number of partitions for internal topics, user
> will have opportunity to ensure co-partitioning before join operation.
> I think this can be quite powerful feature.
> Wondering what others think about this?
>
> Regards,
> Levani
>
> > On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote:
> >
> > Yes, I believe that's what I had in mind. Again, not totally sure it
> > makes sense, but I believe something similar is the rationale for
> > having the partitioner option in Produced.
> >
> > Thanks,
> > -John
> >
> > On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
> > <levani.co...@gmail.com> wrote:
> >>
> >> Hey John,
> >>
> >> Oh that’s interesting use-case.
> >> Do I understand this correctly, in your example I would first issue
> repartition(Repartitioned) with proper partitioner that essentially would
> be the same as the topic I want to join with and then do the KStream#join
> with DSL?
> >>
> >> Regards,
> >> Levani
> >>
> >>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io> wrote:
> >>>
> >>> Hey, all, just to chime in,
> >>>
> >>> I think it might be useful to have an option to specify the
> >>> partitioner. The case I have in mind is that some data may get
> >>> repartitioned and then joined with an input topic. If the right-side
> >>> input topic uses a custom partitioning strategy, then the
> >>> repartitioned stream also needs to be partitioned with the same
> >>> strategy.
> >>>
> >>> Does that make sense, or did I maybe miss something important?
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
> >>> <levani.co...@gmail.com> wrote:
> >>>>
> >>>> Yes, I was thinking about it as well. To be honest I’m not sure about
> it yet.
> >>>> As Kafka Streams DSL user, I don’t really think I would need control
> over partitioner for internal topics.
> >>>> As a user, I would assume that Kafka Streams knows best how to
> partition data for internal topics.
> >>>> In this KIP I wrote that Produced should be used only for topics that
> are created by user In advance.
> >>>> In those cases maybe it make sense to have possibility to specify the
> partitioner.
> >>>> I don’t have clear answer on that yet, but I guess specifying the
> partitioner can be added as well if there’s agreement on this.
> >>>>
> >>>> Regards,
> >>>> Levani
> >>>>
> >>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman <
> sop...@confluent.io> wrote:
> >>>>>
> >>>>> Thanks for clearing that up. I agree that Repartitioned would be a
> useful
> >>>>> addition. I'm wondering if it might also need to have
> >>>>> a withStreamPartitioner method/field, similar to Produced? I'm not
> sure how
> >>>>> widely this feature is really used, but seems it should be available
> for
> >>>>> repartition topics.
> >>>>>
> >>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze <
> levani.co...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hey Sophie,
> >>>>>>
> >>>>>> In both cases KStream#repartition and
> KStream#repartition(Repartitioned)
> >>>>>> topic will be created and managed by Kafka Streams.
> >>>>>> Idea of Repartitioned is to give user more control over the topic
> such as
> >>>>>> num of partitions.
> >>>>>> I feel like Repartitioned parameter is something that is missing in
> >>>>>> current DSL design.
> >>>>>> Essentially giving user control over parallelism by configuring num
> of
> >>>>>> partitions for internal topics.
> >>>>>>
> >>>>>> Hope this answers your question.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Levani
> >>>>>>
> >>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman <
> sop...@confluent.io>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hey Levani,
> >>>>>>>
> >>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
> >>>>>>> KStream#repartition signature taking a Repartitioned, will the
> topic be
> >>>>>>> auto-created by Streams (which seems to be the case for the
> signature
> >>>>>>> without a Repartitioned) or does it have to be pre-created? The
> wording
> >>>>>> in
> >>>>>>> the KIP makes it seem like one version of the method will
> auto-create
> >>>>>>> topics while the other will not.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Sophie
> >>>>>>>
> >>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
> >>>>>> levani.co...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello,
> >>>>>>>>
> >>>>>>>> One more bump about KIP-221 (
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>> <
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>> )
> >>>>>>>> so it doesn’t get lost in mailing list :)
> >>>>>>>> Would love to hear communities opinions/concerns about this KIP.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Levani
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze <
> levani.co...@gmail.com
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello,
> >>>>>>>>>
> >>>>>>>>> Kind reminder about this KIP:
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>> <
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Levani
> >>>>>>>>>
> >>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze <
> >>>>>> levani.co...@gmail.com
> >>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello,
> >>>>>>>>>>
> >>>>>>>>>> In order to move this KIP forward, I’ve updated confluence page
> with
> >>>>>>>> the new proposal
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>> <
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>
> >>>>>>>>>> I’ve also filled “Rejected Alternatives” section.
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to discuss this KIP :)
> >>>>>>>>>>
> >>>>>>>>>> King regards,
> >>>>>>>>>> Levani
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze <
> >>>>>> levani.co...@gmail.com
> >>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hello Matthias,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the feedback and ideas.
> >>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for
> topic
> >>>>>>>> configuration for internal operators like `groupedBy`.
> >>>>>>>>>>> Would be great to hear others opinion about this as well.
> >>>>>>>>>>>
> >>>>>>>>>>> Kind regards,
> >>>>>>>>>>> Levani
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax <
> matth...@confluent.io
> >>>>>>>> <mailto:matth...@confluent.io>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Levani,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing
> >>>>>> everything.
> >>>>>>>>>>>> Even if some points may have been discussed already (can't
> really
> >>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the
> >>>>>>>> discussion.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think your reasoning makes sense. With regard to the
> distinction
> >>>>>>>>>>>> between operators that manage topics and operators that use
> >>>>>>>> user-created
> >>>>>>>>>>>> topics: Following this argument, it might indicate that
> leaving
> >>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined
> topics) and
> >>>>>>>>>>>> introducing a new `repartition()` operator (an operator that
> manages
> >>>>>>>>>>>> topics itself) might be good. Otherwise, there is one operator
> >>>>>>>>>>>> `through()` that sometimes manages topics but sometimes not; a
> >>>>>>>> different
> >>>>>>>>>>>> name, ie, new operator would make the distinction clearer.
> >>>>>>>>>>>>
> >>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering
> if the
> >>>>>>>> same
> >>>>>>>>>>>> argument as for `Produced` does apply and adding it is
> semantically
> >>>>>>>>>>>> questionable? Might be good to get opinions of others on
> this, too.
> >>>>>> I
> >>>>>>>> am
> >>>>>>>>>>>> not sure myself what solution I prefer atm.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So far, KS uses configuration objects that allow to configure
> a
> >>>>>>>> certain
> >>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that
> a topic
> >>>>>>>> is
> >>>>>>>>>>>> a similar entity, I am wonder if we should have a
> >>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of
> a plain
> >>>>>>>>>>>> integer? This would allow us to add other configs, like
> replication
> >>>>>>>>>>>> factor, retention-time etc, easily, without the need to
> change the
> >>>>>>>> "main
> >>>>>>>>>>>> API".
> >>>>>>>>>>>>
> >>>>>>>>>>>> Just want to give some ideas. Not sure if I like them myself.
> :)
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
> >>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced
> with num
> >>>>>>>> of partitions configuration is not the best approach. Let me
> explain
> >>>>>> why:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) If we enhance Produced class with this configuration,
> this will
> >>>>>>>> also affect KStream#to operation. Since KStream#to is the final
> sink of
> >>>>>> the
> >>>>>>>> topology, for me, it seems to be reasonable assumption that user
> needs
> >>>>>> to
> >>>>>>>> manually create sink topic in advance. And in that case, having
> num of
> >>>>>>>> partitions configuration doesn’t make much sense.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems
> like
> >>>>>>>> Produced is designed to be something that is explicitly for
> producer
> >>>>>> (key
> >>>>>>>> serializer, value serializer, partitioner those all are producer
> >>>>>> specific
> >>>>>>>> configurations) and num of partitions is topic level
> configuration. And
> >>>>>> I
> >>>>>>>> don’t think mixing topic and producer level configurations
> together in
> >>>>>> one
> >>>>>>>> class is the good approach.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced
> parameter is
> >>>>>>>> for operations that work with non-internal (e.g topics created and
> >>>>>> managed
> >>>>>>>> internally by Kafka Streams) topics and I think we should leave
> it as
> >>>>>> it is
> >>>>>>>> in that case.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Taking all this things into account, I think we should
> distinguish
> >>>>>>>> between DSL operations, where Kafka Streams should create and
> manage
> >>>>>>>> internal topics (KStream#groupBy) vs topics that should be
> created in
> >>>>>>>> advance (e.g KStream#to).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in
> >>>>>> Produced
> >>>>>>>> will result in mixing topic and producer level configuration in
> one
> >>>>>> class
> >>>>>>>> and it’s gonna break existing API semantics.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I
> think
> >>>>>>>> underline idea is very useful and giving users possibility to
> specify
> >>>>>> num
> >>>>>>>> of partitions there is even more useful :) Considering arguments
> against
> >>>>>>>> adding num of partitions in Produced class, I see two options
> here:
> >>>>>>>>>>>>> 1) Add following method overloads
> >>>>>>>>>>>>> * through() - topic will be auto-generated and num of
> partitions
> >>>>>>>> will be taken from source topic
> >>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
> >>>>>>>> generated with specified num of partitions
> >>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
> >>>>>>>> produced) - topic will be with generated with specified num of
> >>>>>> partitions
> >>>>>>>> and configuration taken from produced parameter.
> >>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method -
> >>>>>>>> KStream#repartition (I think Matthias suggested this in one of the
> >>>>>> threads)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Considering all mentioned above I propose the following plan:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Option A:
> >>>>>>>>>>>>> 1) Leave Produced as it is
> >>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
> >>>>>>>> mentioned in the KIP)
> >>>>>>>>>>>>> 3) Add following method overloads to KStream#through
> >>>>>>>>>>>>> * through() - topic will be auto-generated and num of
> partitions
> >>>>>>>> will be taken from source topic
> >>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
> >>>>>>>> generated with specified num of partitions
> >>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
> >>>>>>>> produced) - topic will be with generated with specified num of
> >>>>>> partitions
> >>>>>>>> and configuration taken from produced parameter.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Option B:
> >>>>>>>>>>>>> 1) Leave Produced as it is
> >>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
> >>>>>>>> mentioned in the KIP)
> >>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and
> managing
> >>>>>>>> internal repartition topics
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the
> mailing
> >>>>>>>> list, but I kinda got with all the threads that were about this
> KIP :(
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kind regards,
> >>>>>>>>>>>>> Levani
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze <
> >>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going
> through
> >>>>>>>> the discussion thread, there’s seems to agreement around
> usefulness of
> >>>>>> this
> >>>>>>>> feature.
> >>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the
> most
> >>>>>>>> optimal solution for me seems the following:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method
> (essentially
> >>>>>>>> making topic name optional)
> >>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions configuration
> >>>>>> field.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Those two changes will allow DSL users to control
> parallelism and
> >>>>>>>> trigger re-partition without doing stateful operations.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I will update KIP with interface changes around
> KStream#through if
> >>>>>>>> this changes sound sensible.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Kind regards,
> >>>>>>>>>>>>>> Levani
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>
>
>

Reply via email to