Thanks for pointing that Koert!

I understand now why zero() and not init(a: IN), though I still don't see a
good reason to skip the aggregation if zero returns null.
If the user did it, it's on him to take care of null cases in reduce/merge,
but it opens-up the possibility to use the input to create the buffer for
the aggregator.
Wouldn't that at least enable the functionality discussed in SPARK-15598 ?
without changing how the Aggregator works.

I bypassed it by using Optional (Guava) because I'm using the Java API, but
it's a bit cumbersome...

Thanks,
Amit

On Thu, Jun 30, 2016 at 1:54 AM Koert Kuipers <ko...@tresata.com> wrote:

> its the difference between a semigroup and a monoid, and yes max does not
> easily fit into a monoid.
>
> see also discussion here:
> https://issues.apache.org/jira/browse/SPARK-15598
>
> On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> OK. I see that, but the current (provided) implementations are very naive
>> - Sum, Count, Average -let's take Max for example: I guess zero() would be
>> set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
>> the future Spark streaming will support time-based triggers) for a result
>> and there are no events ?
>>
>> And like I said, for a more general use case: What if my zero() function
>> depends on my input ?
>>
>> I just don't see the benefit of this behaviour, though I realise this is
>> the implementation.
>>
>> Thanks,
>> Amit
>>
>> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin....@gmail.com>
>> wrote:
>>
>>> No, TypedAggregateExpression that uses Aggregator#zero is different
>>> between v2.0 and v1.6.
>>> v2.0:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
>>> v1.6:
>>> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>
>>>> This "if (value == null)" condition you point to exists in 1.6 branch
>>>> as well, so that's probably not the reason.
>>>>
>>>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin....@gmail.com>
>>>> wrote:
>>>>
>>>>> Whatever it is, this is expected; if an initial value is null, spark
>>>>> codegen removes all the aggregates.
>>>>> See:
>>>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>>>>
>>>>> // maropu
>>>>>
>>>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure about what's the rule in case of `b + null = null` but the
>>>>>> same code works perfectly in 1.6.1, just tried it..
>>>>>>
>>>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <
>>>>>> linguin....@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This behaviour seems to be expected because you must ensure `b +
>>>>>>> zero() = b`
>>>>>>> The your case `b + null = null` breaks this rule.
>>>>>>> This is the same with v1.6.1.
>>>>>>> See:
>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>>>>>
>>>>>>> // maropu
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sometimes, the BUF for the aggregator may depend on the actual
>>>>>>>> input.. and while this passes the responsibility to handle null in
>>>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one 
>>>>>>>> who
>>>>>>>> put null in zero() anyway.
>>>>>>>> Now, it seems that the aggregation is skipped entirely when zero()
>>>>>>>> = null. Not sure if that was the behaviour in 1.6
>>>>>>>>
>>>>>>>> Is this behaviour wanted ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Amit
>>>>>>>>
>>>>>>>> Aggregator example:
>>>>>>>>
>>>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>>>>>> Integer, Integer> {
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer zero() {
>>>>>>>>     return null;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>>>>>>     if (b == null) {
>>>>>>>>       b = 0;
>>>>>>>>     }
>>>>>>>>     return b + a._2();
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer merge(Integer b1, Integer b2) {
>>>>>>>>     if (b1 == null) {
>>>>>>>>       return b2;
>>>>>>>>     } else if (b2 == null) {
>>>>>>>>       return b1;
>>>>>>>>     } else {
>>>>>>>>       return b1 + b2;
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> ---
>>>>>>> Takeshi Yamamuro
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ---
>>>>> Takeshi Yamamuro
>>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>

Reply via email to