Hi,

I think extending min/max to non-numeric types makes sense. Wondering
why we should require a `Comparator` or if we should require that the
types implement `Comparable` instead?

I also think, that min/max should not change the value type. Using
`Long` for sum() make sense though, and also to require a `<V extends
Number>`.


-Matthias

On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> Hi Alex,
> 
> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <alexandre.bra...@gmail.com>
> wrote:
> 
>>
>> My point here is that, when we're only interested in a max/min numeric
>> value, it doesn't
>> matter when we have repeated values, since we'd be only forwarding the
>> number downstream,
>> so I could disregard when the Comparator returns a zero value (meaning
>> equals) and min/max
>> would still be semantically correct. But when we're forwarding the original
>> object downstream
>> instead of its numeric property, it could mean different things
>> semantically depending on how
>> we handle the repeated values.
>>
>> As an example, if I were using max() on a stream of Biddings for products
>> in an auction, the
>> order of the biddings would probably influence the winner if two clients
>> send Biddings with the
>> same value. If we're only forwarding the Bidding value downstream (a double
>> value of 100, for
>> example), it doesn't matter how repeated values are handled, since the max
>> price for this
>> auction would still be 100.00, no matter what Bidding got selected in the
>> end. But if we're
>> forwarding the Biddings downstream instead, then it matters whether the
>> winning Bidding sent
>> downstream was originally posted by Client A or Client B.
>>
>> I'm not saying that an overloaded method to handle different options for
>> how repeated values
>> should be handled by min/max is mandatory, but it should be clear on the
>> methods' docs
>> what would happen when Comparator.compare() == 0. My preferred option for
>> the default
>> behaviour is to only forward a new value is smaller/bigger than the
>> previous min/max value
>> (ignoring compare() == 0), since it would emit less values downstream and
>> would be easier
>> to read ("I only send a value downstream if it's bigger/smaller than the
>> previously selected
>> value").
>>
> Thanks for the clarification. I like your suggestion unless someone feels
> that they want an option to control this (i.e., when compare() == 0, return
> the old value vs new value).
> 
> 
>>
>>> Not knowing the schema of the value (V) has its own set of problems. As I
>> have alluded to
>>> in the proposal, this is a little bit messy. We already have "reduce"
>> which can be used to
>>> implement sum (mapValues().reduce()).
>>> Thinking about it more, do you think "sum" would be useful ? One hacky
>> way to implement
>>> this is to inspect the type of the return when the "func" is called the
>> first time OR infer from
>>> the materialized or have an explicit initializer.
>>
>> I think it might be useful for some use cases, yes, but it would be tricky
>> to implement this in a
>> way that handles generic Numbers and keeps their original implementation
>> class. One
>> simplification you could take is fixating VR to be a Double, and then use
>> Number.doubleValue()
>> to compute the sum.
>>
> 
> Yeah, that would simplify quite a bit. I think you are suggesting this:
> 
> KTable<K,Double> sum(Function<V, Number> func)
> 
> 
>> What you said about using reduce() to compute a sum() is also true for
>> min() and max(). =) All
>> three methods in this KIP would be a syntactic sugar for what could
>> otherwise be implemented
>> using reduce/aggregate, but I see value in implementing them and
>> simplifying the adoption of
>> those use cases.
>>
>> Agreed. I seem to have forgotten the reason as to why I started this KIP
> :-). There is a long way to go.
> 
> -thanks
> Mohan
> 
> Best regards,
>> Alexandre
>>
>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <mposde...@gmail.com>
>> wrote:
>>
>>> Hi Alex,
>>>
>>> Responses below.
>>>
>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
>>> alexandre.bra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Mohan,
>>>>
>>>> I like the idea of adding those methods to the API, but I'd like to
>> make
>>> a
>>>> suggestion:
>>>>
>>>> Although the most used scenario for min() / max() might possibly be for
>>>> numeric values, I think they could also be
>>>> useful on other objects like Dates, LocalDates or Strings. Why limit
>> the
>>>> API to Numbers only?
>>>>
>>>>
>>> There was no specific reason. Just addressing the common scenario. But I
>>> don't see why this can't be supported given your suggestion below.
>>>
>>> Extending on the above, couldn't we change the API to provide a
>>>> Comparator<V> instead of the Function<V, VR>
>>>> for those methods, and make them return a KTable<K, V> instead? Not
>> only
>>>> would this approach not limit the
>>>> usage of those methods to Numbers, but they'd also preserve the origin
>>> from
>>>> the min/max value [1]. The extraction of
>>>> a single (numeric?) value could be achieved by a subsequent
>> .mapValues()
>>>> operator, and this strategy would also
>>>> allow us to reuse the stream's current value serde on min / max, making
>>> the
>>>> Materialized an optional parameter.
>>>>
>>>> I like your idea though it is odd that min/max returns  KTable<K, V>
>>> instead of the KTable<K, VR> (like in count), but mapValues should do the
>>> trick.
>>>
>>> One extra complication of this approach is that now we'd have to handle
>>>> repeated min/max values from different
>>>> origins (two semantically different objects for which the comparator
>>>> returns 0), but we could solve that by adding
>>>> a parameter to specify whether to use the older or newer value (or
>>> assuming
>>>> one of these options as default for a
>>>> simpler API?).
>>>>
>>>> I am not sure whether this complexity is warranted. Why can't we just
>>> stick to the way a regular Comparator works ? Can you give me a real
>> world
>>> example ?
>>>
>>>>
>>>> I know it's an implementation issue, but I'm curious on how you'd solve
>>>> handling the <VR extends Number> on
>>>> the sum(). Since the multiple implementations of this interface don't
>>> have
>>>> a common constructor nor an interface
>>>> method to add two Numbers, would it be possible to implement sum() and
>>>> retain the original VR type on the
>>>> returned KTable?
>>>>
>>>
>>> Not knowing the schema of the value (V) has its own set of problems. As I
>>> have alluded to in the proposal, this is a little bit messy. We already
>>> have "reduce" which can be used to implement sum (mapValues().reduce()).
>>> Thinking about it more, do you think "sum" would be useful ? One hacky
>> way
>>> to implement this is to inspect the type of the return when the "func" is
>>> called the first time OR infer from the materialized or have an explicit
>>> initializer.
>>>
>>> Thanks
>>> Mohan
>>>
>>>
>>>> [1]: An example scenario for this would be to find the min / max
>> Bidding
>>>> for a product where, at the end of the
>>>> auction, I need not only the min / max value of said Bidding, but also
>>> the
>>>> bidder's contact information.
>>>>
>>>> Best,
>>>> Alexandre
>>>>
>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
>> mposde...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have created a proposal for adding some additional aggregation APIs
>>>> like
>>>>> count.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
>>>>>
>>>>> I have noted down some of the issues that need discussion. Thanks to
>>>>> Matthias for helping me with the scope of the proposal.
>>>>>
>>>>> Thanks
>>>>> Mohan
>>>>>
>>>>
>>>
>>
> 

Reply via email to