OK. I see that, but the current (provided) implementations are very naive -
Sum, Count, Average -let's take Max for example: I guess zero() would be
set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
the future Spark streaming will support time-based triggers) for a result
and there are no events ?

And like I said, for a more general use case: What if my zero() function
depends on my input ?

I just don't see the benefit of this behaviour, though I realise this is
the implementation.

Thanks,
Amit

On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin....@gmail.com>
wrote:

> No, TypedAggregateExpression that uses Aggregator#zero is different
> between v2.0 and v1.6.
> v2.0:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
> v1.6:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> This "if (value == null)" condition you point to exists in 1.6 branch as
>> well, so that's probably not the reason.
>>
>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin....@gmail.com>
>> wrote:
>>
>>> Whatever it is, this is expected; if an initial value is null, spark
>>> codegen removes all the aggregates.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>>
>>> // maropu
>>>
>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>
>>>> Not sure about what's the rule in case of `b + null = null` but the
>>>> same code works perfectly in 1.6.1, just tried it..
>>>>
>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> This behaviour seems to be expected because you must ensure `b +
>>>>> zero() = b`
>>>>> The your case `b + null = null` breaks this rule.
>>>>> This is the same with v1.6.1.
>>>>> See:
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>>>
>>>>> // maropu
>>>>>
>>>>>
>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sometimes, the BUF for the aggregator may depend on the actual
>>>>>> input.. and while this passes the responsibility to handle null in
>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one who
>>>>>> put null in zero() anyway.
>>>>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>>>>> null. Not sure if that was the behaviour in 1.6
>>>>>>
>>>>>> Is this behaviour wanted ?
>>>>>>
>>>>>> Thanks,
>>>>>> Amit
>>>>>>
>>>>>> Aggregator example:
>>>>>>
>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>>>> Integer, Integer> {
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer zero() {
>>>>>>     return null;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>>>>     if (b == null) {
>>>>>>       b = 0;
>>>>>>     }
>>>>>>     return b + a._2();
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer merge(Integer b1, Integer b2) {
>>>>>>     if (b1 == null) {
>>>>>>       return b2;
>>>>>>     } else if (b2 == null) {
>>>>>>       return b1;
>>>>>>     } else {
>>>>>>       return b1 + b2;
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ---
>>>>> Takeshi Yamamuro
>>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Reply via email to