> I am not sure why we would want to pass `Function<V, Comparable<?>> > func` into `min()`?
I guess I misread/misunderstood your earlier suggestion. My line of thought was that, instead of using a method signature that demands a Comparator<V> in min()/max(), we might use a property extractor (like the FK extractors on some join() overloads) to return a Comparable property that min()/max() could use to compare the values. The benefit of this approach is that It would be simpler than implementing comparators when most use cases would probably compare properties of the values that already implement Comparable (like Numbers, Strings, Dates, etc), but on the other hand it would be more limiting in disallowing the usage of multiple properties of <V> or on defining how null property values should be handled. On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax <mj...@apache.org> wrote: > I was playing with the code a little bit, but it seems not to be easy to > use generics to enforce that V is `Comparable`... > > We would need to introduce a new interface > > interface ComparableStream<K, V extends Comparable<V>> > extends KStream<K, V> > { > KTable<K, V> min(); > } > > But it also requires a nasty cast to actually use it: > > KStream<String, String> stream = > new StreamsBuilder().stream(""); > KTable<String, String> table = > ((ComparableStream<String, String>) stream).min(); > > If the value-type does not implement `Comparable` the cast would not > compile... Or would there be a simpler way to ensure that min() can only > be called _if_ V is `Comparable`? > > > So maybe passing in a `Comparator<V>` might be the right way to go; > might also be more flexible anyway. -- My original idea was just to > maybe avoid the `Comparator` argument, as it would make the API nicer > IMHO; fewer parameters is usually better... > > > I am not sure why we would want to pass `Function<V, Comparable<?>> > func` into `min()`? > > > > -Matthias > > > > On 6/21/21 11:23 AM, Mohan Parthasarathy wrote: > > Alex, > > > > > > On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil < > alexandre.bra...@gmail.com> > > wrote: > > > >> Mohan / Mathias, > >> > >>>> I think extending min/max to non-numeric types makes sense. Wondering > >>>> why we should require a `Comparator` or if we should require that the > >>>> types implement `Comparable` instead? > >>>> > >>> Good question. This is what it would look like: > >>> > >>> KTable<K, V> min_comparable() > >>> KTable<K, V> min_comparator(Comparator<V> comp) > >> > >> Not sure if I understood Mathias' proposal correctly, but I think that > >> instead of going with > >> your original proposal (<VR extends Number> KTable<K, VR> > min(Function<V, > >> VR> func...) > >> or mine (KTable<K, V> min(Comparator<V> comparator...), we could > simplify > >> it a > >> bit by using a function to extract a Comparable property from the > original > >> value: > >> > >> KTable<K, V> min(Function<V, Comparable<?>> func...) > >> > >> I will let Matthias clarify. I am not sure why it is simpler than the > > comparator one. Comparable is implemented by the type and not sure > exposing > > it this way makes it any better. > > > >> I also think, that min/max should not change the value type. Using > >>> `Long` for sum() make sense though, and also to require a `<V extends > >>> Number>`. > >> > >> Are there any reasons to limit the sum() to integers? Why not use a > Double > >> instead? > >> > >> Yeah, if the precision is important, then we should stick with Double. > > > > -mohan > > > > Best regards, > >> Alexandre > >> > >> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy < > mposde...@gmail.com> > >> wrote: > >> > >>> Matthias, > >>> > >>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax > >> <mj...@mailbox.org.invalid > >>>> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> I think extending min/max to non-numeric types makes sense. Wondering > >>>> why we should require a `Comparator` or if we should require that the > >>>> types implement `Comparable` instead? > >>>> > >>>> Good question. This is what it would look like: > >>> > >>> KTable<K, V> min_comparable() > >>> KTable<K, V> min_comparator(Comparator<V> comp) > >>> > >>> For min_comparable to work, you still need the bounds "V extends > >>> Comparable< > >>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, > >> it > >>> has to be at the interface level like this: > >>> > >>> KStream<K, V extends Comparable<V>> > >>> > >>> which is a little messy unless there is a different way to do the same. > >> The > >>> comparator gives a simple way to define an anonymous function. > >>> > >>> What do you think ? > >>> > >>> > >>>> I also think, that min/max should not change the value type. Using > >>>> `Long` for sum() make sense though, and also to require a `<V extends > >>>> Number>`. > >>>> > >>>> I guess these are the two possibilities: > >>> > >>> <E extends Number> Long sum(Function<V, E> func) > >>> Long sum(Function<V, Number> func) > >>> > >>> Both should work. "func" can return any subtypes of Number and I don't > >> see > >>> any advantages with the first version. Can you clarify ? > >>> > >>> Thanks > >>> Mohan > >>> > >>> > >>>> > >>>> -Matthias > >>>> > >>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: > >>>>> Hi Alex, > >>>>> > >>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < > >>>> alexandre.bra...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> > >>>>>> My point here is that, when we're only interested in a max/min > >> numeric > >>>>>> value, it doesn't > >>>>>> matter when we have repeated values, since we'd be only forwarding > >> the > >>>>>> number downstream, > >>>>>> so I could disregard when the Comparator returns a zero value > >> (meaning > >>>>>> equals) and min/max > >>>>>> would still be semantically correct. But when we're forwarding the > >>>> original > >>>>>> object downstream > >>>>>> instead of its numeric property, it could mean different things > >>>>>> semantically depending on how > >>>>>> we handle the repeated values. > >>>>>> > >>>>>> As an example, if I were using max() on a stream of Biddings for > >>>> products > >>>>>> in an auction, the > >>>>>> order of the biddings would probably influence the winner if two > >>> clients > >>>>>> send Biddings with the > >>>>>> same value. If we're only forwarding the Bidding value downstream (a > >>>> double > >>>>>> value of 100, for > >>>>>> example), it doesn't matter how repeated values are handled, since > >> the > >>>> max > >>>>>> price for this > >>>>>> auction would still be 100.00, no matter what Bidding got selected > >> in > >>>> the > >>>>>> end. But if we're > >>>>>> forwarding the Biddings downstream instead, then it matters whether > >>> the > >>>>>> winning Bidding sent > >>>>>> downstream was originally posted by Client A or Client B. > >>>>>> > >>>>>> I'm not saying that an overloaded method to handle different options > >>> for > >>>>>> how repeated values > >>>>>> should be handled by min/max is mandatory, but it should be clear on > >>> the > >>>>>> methods' docs > >>>>>> what would happen when Comparator.compare() == 0. My preferred > >> option > >>>> for > >>>>>> the default > >>>>>> behaviour is to only forward a new value is smaller/bigger than the > >>>>>> previous min/max value > >>>>>> (ignoring compare() == 0), since it would emit less values > >> downstream > >>>> and > >>>>>> would be easier > >>>>>> to read ("I only send a value downstream if it's bigger/smaller than > >>> the > >>>>>> previously selected > >>>>>> value"). > >>>>>> > >>>>> Thanks for the clarification. I like your suggestion unless someone > >>> feels > >>>>> that they want an option to control this (i.e., when compare() == 0, > >>>> return > >>>>> the old value vs new value). > >>>>> > >>>>> > >>>>>> > >>>>>>> Not knowing the schema of the value (V) has its own set of > >> problems. > >>>> As I > >>>>>> have alluded to > >>>>>>> in the proposal, this is a little bit messy. We already have > >> "reduce" > >>>>>> which can be used to > >>>>>>> implement sum (mapValues().reduce()). > >>>>>>> Thinking about it more, do you think "sum" would be useful ? One > >>> hacky > >>>>>> way to implement > >>>>>>> this is to inspect the type of the return when the "func" is called > >>> the > >>>>>> first time OR infer from > >>>>>>> the materialized or have an explicit initializer. > >>>>>> > >>>>>> I think it might be useful for some use cases, yes, but it would be > >>>> tricky > >>>>>> to implement this in a > >>>>>> way that handles generic Numbers and keeps their original > >>> implementation > >>>>>> class. One > >>>>>> simplification you could take is fixating VR to be a Double, and > >> then > >>>> use > >>>>>> Number.doubleValue() > >>>>>> to compute the sum. > >>>>>> > >>>>> > >>>>> Yeah, that would simplify quite a bit. I think you are suggesting > >> this: > >>>>> > >>>>> KTable<K,Double> sum(Function<V, Number> func) > >>>>> > >>>>> > >>>>>> What you said about using reduce() to compute a sum() is also true > >> for > >>>>>> min() and max(). =) All > >>>>>> three methods in this KIP would be a syntactic sugar for what could > >>>>>> otherwise be implemented > >>>>>> using reduce/aggregate, but I see value in implementing them and > >>>>>> simplifying the adoption of > >>>>>> those use cases. > >>>>>> > >>>>>> Agreed. I seem to have forgotten the reason as to why I started this > >>> KIP > >>>>> :-). There is a long way to go. > >>>>> > >>>>> -thanks > >>>>> Mohan > >>>>> > >>>>> Best regards, > >>>>>> Alexandre > >>>>>> > >>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < > >>>> mposde...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Alex, > >>>>>>> > >>>>>>> Responses below. > >>>>>>> > >>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > >>>>>>> alexandre.bra...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Mohan, > >>>>>>>> > >>>>>>>> I like the idea of adding those methods to the API, but I'd like > >> to > >>>>>> make > >>>>>>> a > >>>>>>>> suggestion: > >>>>>>>> > >>>>>>>> Although the most used scenario for min() / max() might possibly > >> be > >>>> for > >>>>>>>> numeric values, I think they could also be > >>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why > >> limit > >>>>>> the > >>>>>>>> API to Numbers only? > >>>>>>>> > >>>>>>>> > >>>>>>> There was no specific reason. Just addressing the common scenario. > >>> But > >>>> I > >>>>>>> don't see why this can't be supported given your suggestion below. > >>>>>>> > >>>>>>> Extending on the above, couldn't we change the API to provide a > >>>>>>>> Comparator<V> instead of the Function<V, VR> > >>>>>>>> for those methods, and make them return a KTable<K, V> instead? > >> Not > >>>>>> only > >>>>>>>> would this approach not limit the > >>>>>>>> usage of those methods to Numbers, but they'd also preserve the > >>> origin > >>>>>>> from > >>>>>>>> the min/max value [1]. The extraction of > >>>>>>>> a single (numeric?) value could be achieved by a subsequent > >>>>>> .mapValues() > >>>>>>>> operator, and this strategy would also > >>>>>>>> allow us to reuse the stream's current value serde on min / max, > >>>> making > >>>>>>> the > >>>>>>>> Materialized an optional parameter. > >>>>>>>> > >>>>>>>> I like your idea though it is odd that min/max returns KTable<K, > >> V> > >>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should > >> do > >>>> the > >>>>>>> trick. > >>>>>>> > >>>>>>> One extra complication of this approach is that now we'd have to > >>> handle > >>>>>>>> repeated min/max values from different > >>>>>>>> origins (two semantically different objects for which the > >> comparator > >>>>>>>> returns 0), but we could solve that by adding > >>>>>>>> a parameter to specify whether to use the older or newer value (or > >>>>>>> assuming > >>>>>>>> one of these options as default for a > >>>>>>>> simpler API?). > >>>>>>>> > >>>>>>>> I am not sure whether this complexity is warranted. Why can't we > >>> just > >>>>>>> stick to the way a regular Comparator works ? Can you give me a > >> real > >>>>>> world > >>>>>>> example ? > >>>>>>> > >>>>>>>> > >>>>>>>> I know it's an implementation issue, but I'm curious on how you'd > >>>> solve > >>>>>>>> handling the <VR extends Number> on > >>>>>>>> the sum(). Since the multiple implementations of this interface > >>> don't > >>>>>>> have > >>>>>>>> a common constructor nor an interface > >>>>>>>> method to add two Numbers, would it be possible to implement sum() > >>> and > >>>>>>>> retain the original VR type on the > >>>>>>>> returned KTable? > >>>>>>>> > >>>>>>> > >>>>>>> Not knowing the schema of the value (V) has its own set of > >> problems. > >>>> As I > >>>>>>> have alluded to in the proposal, this is a little bit messy. We > >>> already > >>>>>>> have "reduce" which can be used to implement sum > >>>> (mapValues().reduce()). > >>>>>>> Thinking about it more, do you think "sum" would be useful ? One > >>> hacky > >>>>>> way > >>>>>>> to implement this is to inspect the type of the return when the > >>> "func" > >>>> is > >>>>>>> called the first time OR infer from the materialized or have an > >>>> explicit > >>>>>>> initializer. > >>>>>>> > >>>>>>> Thanks > >>>>>>> Mohan > >>>>>>> > >>>>>>> > >>>>>>>> [1]: An example scenario for this would be to find the min / max > >>>>>> Bidding > >>>>>>>> for a product where, at the end of the > >>>>>>>> auction, I need not only the min / max value of said Bidding, but > >>> also > >>>>>>> the > >>>>>>>> bidder's contact information. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Alexandre > >>>>>>>> > >>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > >>>>>> mposde...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> I have created a proposal for adding some additional aggregation > >>> APIs > >>>>>>>> like > >>>>>>>>> count. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > >>>>>>>>> > >>>>>>>>> I have noted down some of the issues that need discussion. Thanks > >>> to > >>>>>>>>> Matthias for helping me with the scope of the proposal. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> Mohan > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >