On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax <[email protected]> wrote:
> I was playing with the code a little bit, but it seems not to be easy to
> use generics to enforce that V is `Comparable`...
>
> We would need to introduce a new interface
>
> interface ComparableStream<K, V extends Comparable<V>>
> extends KStream<K, V>
> {
> KTable<K, V> min();
> }
>
> But it also requires a nasty cast to actually use it:
>
> KStream<String, String> stream =
> new StreamsBuilder().stream("");
> KTable<String, String> table =
> ((ComparableStream<String, String>) stream).min();
>
> If the value-type does not implement `Comparable` the cast would not
> compile... Or would there be a simpler way to ensure that min() can only
> be called _if_ V is `Comparable`?
>
>
> So maybe passing in a `Comparator<V>` might be the right way to go;
> might also be more flexible anyway. -- My original idea was just to
> maybe avoid the `Comparator` argument, as it would make the API nicer
> IMHO; fewer parameters is usually better...
>
> Yeah, I tried both and using Comparator seems more natural to me in this
case. I will update the document with the discussion here.
>
> I am not sure why we would want to pass `Function<V, Comparable<?>>
> func` into `min()`?
>
> Not sure. Also, do you have an opinion on Long vs Double ?
-thanks
Mohan
>
>
> -Matthias
>
>
>
> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> > Alex,
> >
> >
> > On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> [email protected]>
> > wrote:
> >
> >> Mohan / Mathias,
> >>
> >>>> I think extending min/max to non-numeric types makes sense. Wondering
> >>>> why we should require a `Comparator` or if we should require that the
> >>>> types implement `Comparable` instead?
> >>>>
> >>> Good question. This is what it would look like:
> >>>
> >>> KTable<K, V> min_comparable()
> >>> KTable<K, V> min_comparator(Comparator<V> comp)
> >>
> >> Not sure if I understood Mathias' proposal correctly, but I think that
> >> instead of going with
> >> your original proposal (<VR extends Number> KTable<K, VR>
> min(Function<V,
> >> VR> func...)
> >> or mine (KTable<K, V> min(Comparator<V> comparator...), we could
> simplify
> >> it a
> >> bit by using a function to extract a Comparable property from the
> original
> >> value:
> >>
> >> KTable<K, V> min(Function<V, Comparable<?>> func...)
> >>
> >> I will let Matthias clarify. I am not sure why it is simpler than the
> > comparator one. Comparable is implemented by the type and not sure
> exposing
> > it this way makes it any better.
> >
> >> I also think, that min/max should not change the value type. Using
> >>> `Long` for sum() make sense though, and also to require a `<V extends
> >>> Number>`.
> >>
> >> Are there any reasons to limit the sum() to integers? Why not use a
> Double
> >> instead?
> >>
> >> Yeah, if the precision is important, then we should stick with Double.
> >
> > -mohan
> >
> > Best regards,
> >> Alexandre
> >>
> >> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> [email protected]>
> >> wrote:
> >>
> >>> Matthias,
> >>>
> >>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
> >> <[email protected]
> >>>>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I think extending min/max to non-numeric types makes sense. Wondering
> >>>> why we should require a `Comparator` or if we should require that the
> >>>> types implement `Comparable` instead?
> >>>>
> >>>> Good question. This is what it would look like:
> >>>
> >>> KTable<K, V> min_comparable()
> >>> KTable<K, V> min_comparator(Comparator<V> comp)
> >>>
> >>> For min_comparable to work, you still need the bounds "V extends
> >>> Comparable<
> >>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
> >> it
> >>> has to be at the interface level like this:
> >>>
> >>> KStream<K, V extends Comparable<V>>
> >>>
> >>> which is a little messy unless there is a different way to do the same.
> >> The
> >>> comparator gives a simple way to define an anonymous function.
> >>>
> >>> What do you think ?
> >>>
> >>>
> >>>> I also think, that min/max should not change the value type. Using
> >>>> `Long` for sum() make sense though, and also to require a `<V extends
> >>>> Number>`.
> >>>>
> >>>> I guess these are the two possibilities:
> >>>
> >>> <E extends Number> Long sum(Function<V, E> func)
> >>> Long sum(Function<V, Number> func)
> >>>
> >>> Both should work. "func" can return any subtypes of Number and I don't
> >> see
> >>> any advantages with the first version. Can you clarify ?
> >>>
> >>> Thanks
> >>> Mohan
> >>>
> >>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> >>>>> Hi Alex,
> >>>>>
> >>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> >>>> [email protected]>
> >>>>> wrote:
> >>>>>
> >>>>>>
> >>>>>> My point here is that, when we're only interested in a max/min
> >> numeric
> >>>>>> value, it doesn't
> >>>>>> matter when we have repeated values, since we'd be only forwarding
> >> the
> >>>>>> number downstream,
> >>>>>> so I could disregard when the Comparator returns a zero value
> >> (meaning
> >>>>>> equals) and min/max
> >>>>>> would still be semantically correct. But when we're forwarding the
> >>>> original
> >>>>>> object downstream
> >>>>>> instead of its numeric property, it could mean different things
> >>>>>> semantically depending on how
> >>>>>> we handle the repeated values.
> >>>>>>
> >>>>>> As an example, if I were using max() on a stream of Biddings for
> >>>> products
> >>>>>> in an auction, the
> >>>>>> order of the biddings would probably influence the winner if two
> >>> clients
> >>>>>> send Biddings with the
> >>>>>> same value. If we're only forwarding the Bidding value downstream (a
> >>>> double
> >>>>>> value of 100, for
> >>>>>> example), it doesn't matter how repeated values are handled, since
> >> the
> >>>> max
> >>>>>> price for this
> >>>>>> auction would still be 100.00, no matter what Bidding got selected
> >> in
> >>>> the
> >>>>>> end. But if we're
> >>>>>> forwarding the Biddings downstream instead, then it matters whether
> >>> the
> >>>>>> winning Bidding sent
> >>>>>> downstream was originally posted by Client A or Client B.
> >>>>>>
> >>>>>> I'm not saying that an overloaded method to handle different options
> >>> for
> >>>>>> how repeated values
> >>>>>> should be handled by min/max is mandatory, but it should be clear on
> >>> the
> >>>>>> methods' docs
> >>>>>> what would happen when Comparator.compare() == 0. My preferred
> >> option
> >>>> for
> >>>>>> the default
> >>>>>> behaviour is to only forward a new value is smaller/bigger than the
> >>>>>> previous min/max value
> >>>>>> (ignoring compare() == 0), since it would emit less values
> >> downstream
> >>>> and
> >>>>>> would be easier
> >>>>>> to read ("I only send a value downstream if it's bigger/smaller than
> >>> the
> >>>>>> previously selected
> >>>>>> value").
> >>>>>>
> >>>>> Thanks for the clarification. I like your suggestion unless someone
> >>> feels
> >>>>> that they want an option to control this (i.e., when compare() == 0,
> >>>> return
> >>>>> the old value vs new value).
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>> Not knowing the schema of the value (V) has its own set of
> >> problems.
> >>>> As I
> >>>>>> have alluded to
> >>>>>>> in the proposal, this is a little bit messy. We already have
> >> "reduce"
> >>>>>> which can be used to
> >>>>>>> implement sum (mapValues().reduce()).
> >>>>>>> Thinking about it more, do you think "sum" would be useful ? One
> >>> hacky
> >>>>>> way to implement
> >>>>>>> this is to inspect the type of the return when the "func" is called
> >>> the
> >>>>>> first time OR infer from
> >>>>>>> the materialized or have an explicit initializer.
> >>>>>>
> >>>>>> I think it might be useful for some use cases, yes, but it would be
> >>>> tricky
> >>>>>> to implement this in a
> >>>>>> way that handles generic Numbers and keeps their original
> >>> implementation
> >>>>>> class. One
> >>>>>> simplification you could take is fixating VR to be a Double, and
> >> then
> >>>> use
> >>>>>> Number.doubleValue()
> >>>>>> to compute the sum.
> >>>>>>
> >>>>>
> >>>>> Yeah, that would simplify quite a bit. I think you are suggesting
> >> this:
> >>>>>
> >>>>> KTable<K,Double> sum(Function<V, Number> func)
> >>>>>
> >>>>>
> >>>>>> What you said about using reduce() to compute a sum() is also true
> >> for
> >>>>>> min() and max(). =) All
> >>>>>> three methods in this KIP would be a syntactic sugar for what could
> >>>>>> otherwise be implemented
> >>>>>> using reduce/aggregate, but I see value in implementing them and
> >>>>>> simplifying the adoption of
> >>>>>> those use cases.
> >>>>>>
> >>>>>> Agreed. I seem to have forgotten the reason as to why I started this
> >>> KIP
> >>>>> :-). There is a long way to go.
> >>>>>
> >>>>> -thanks
> >>>>> Mohan
> >>>>>
> >>>>> Best regards,
> >>>>>> Alexandre
> >>>>>>
> >>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
> >>>> [email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Alex,
> >>>>>>>
> >>>>>>> Responses below.
> >>>>>>>
> >>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> >>>>>>> [email protected]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Mohan,
> >>>>>>>>
> >>>>>>>> I like the idea of adding those methods to the API, but I'd like
> >> to
> >>>>>> make
> >>>>>>> a
> >>>>>>>> suggestion:
> >>>>>>>>
> >>>>>>>> Although the most used scenario for min() / max() might possibly
> >> be
> >>>> for
> >>>>>>>> numeric values, I think they could also be
> >>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why
> >> limit
> >>>>>> the
> >>>>>>>> API to Numbers only?
> >>>>>>>>
> >>>>>>>>
> >>>>>>> There was no specific reason. Just addressing the common scenario.
> >>> But
> >>>> I
> >>>>>>> don't see why this can't be supported given your suggestion below.
> >>>>>>>
> >>>>>>> Extending on the above, couldn't we change the API to provide a
> >>>>>>>> Comparator<V> instead of the Function<V, VR>
> >>>>>>>> for those methods, and make them return a KTable<K, V> instead?
> >> Not
> >>>>>> only
> >>>>>>>> would this approach not limit the
> >>>>>>>> usage of those methods to Numbers, but they'd also preserve the
> >>> origin
> >>>>>>> from
> >>>>>>>> the min/max value [1]. The extraction of
> >>>>>>>> a single (numeric?) value could be achieved by a subsequent
> >>>>>> .mapValues()
> >>>>>>>> operator, and this strategy would also
> >>>>>>>> allow us to reuse the stream's current value serde on min / max,
> >>>> making
> >>>>>>> the
> >>>>>>>> Materialized an optional parameter.
> >>>>>>>>
> >>>>>>>> I like your idea though it is odd that min/max returns KTable<K,
> >> V>
> >>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should
> >> do
> >>>> the
> >>>>>>> trick.
> >>>>>>>
> >>>>>>> One extra complication of this approach is that now we'd have to
> >>> handle
> >>>>>>>> repeated min/max values from different
> >>>>>>>> origins (two semantically different objects for which the
> >> comparator
> >>>>>>>> returns 0), but we could solve that by adding
> >>>>>>>> a parameter to specify whether to use the older or newer value (or
> >>>>>>> assuming
> >>>>>>>> one of these options as default for a
> >>>>>>>> simpler API?).
> >>>>>>>>
> >>>>>>>> I am not sure whether this complexity is warranted. Why can't we
> >>> just
> >>>>>>> stick to the way a regular Comparator works ? Can you give me a
> >> real
> >>>>>> world
> >>>>>>> example ?
> >>>>>>>
> >>>>>>>>
> >>>>>>>> I know it's an implementation issue, but I'm curious on how you'd
> >>>> solve
> >>>>>>>> handling the <VR extends Number> on
> >>>>>>>> the sum(). Since the multiple implementations of this interface
> >>> don't
> >>>>>>> have
> >>>>>>>> a common constructor nor an interface
> >>>>>>>> method to add two Numbers, would it be possible to implement sum()
> >>> and
> >>>>>>>> retain the original VR type on the
> >>>>>>>> returned KTable?
> >>>>>>>>
> >>>>>>>
> >>>>>>> Not knowing the schema of the value (V) has its own set of
> >> problems.
> >>>> As I
> >>>>>>> have alluded to in the proposal, this is a little bit messy. We
> >>> already
> >>>>>>> have "reduce" which can be used to implement sum
> >>>> (mapValues().reduce()).
> >>>>>>> Thinking about it more, do you think "sum" would be useful ? One
> >>> hacky
> >>>>>> way
> >>>>>>> to implement this is to inspect the type of the return when the
> >>> "func"
> >>>> is
> >>>>>>> called the first time OR infer from the materialized or have an
> >>>> explicit
> >>>>>>> initializer.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Mohan
> >>>>>>>
> >>>>>>>
> >>>>>>>> [1]: An example scenario for this would be to find the min / max
> >>>>>> Bidding
> >>>>>>>> for a product where, at the end of the
> >>>>>>>> auction, I need not only the min / max value of said Bidding, but
> >>> also
> >>>>>>> the
> >>>>>>>> bidder's contact information.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Alexandre
> >>>>>>>>
> >>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> >>>>>> [email protected]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I have created a proposal for adding some additional aggregation
> >>> APIs
> >>>>>>>> like
> >>>>>>>>> count.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> >>>>>>>>>
> >>>>>>>>> I have noted down some of the issues that need discussion. Thanks
> >>> to
> >>>>>>>>> Matthias for helping me with the scope of the proposal.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Mohan
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>