OK. I see that, but the current (provided) implementations are very naive - Sum, Count, Average -let's take Max for example: I guess zero() would be set to some value like Long.MIN_VALUE, but what if you trigger (I assume in the future Spark streaming will support time-based triggers) for a result and there are no events ?
And like I said, for a more general use case: What if my zero() function depends on my input ? I just don't see the benefit of this behaviour, though I realise this is the implementation. Thanks, Amit On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin....@gmail.com> wrote: > No, TypedAggregateExpression that uses Aggregator#zero is different > between v2.0 and v1.6. > v2.0: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91 > v1.6: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115 > > // maropu > > > On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote: > >> This "if (value == null)" condition you point to exists in 1.6 branch as >> well, so that's probably not the reason. >> >> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin....@gmail.com> >> wrote: >> >>> Whatever it is, this is expected; if an initial value is null, spark >>> codegen removes all the aggregates. >>> See: >>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199 >>> >>> // maropu >>> >>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote: >>> >>>> Not sure about what's the rule in case of `b + null = null` but the >>>> same code works perfectly in 1.6.1, just tried it.. >>>> >>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin....@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> This behaviour seems to be expected because you must ensure `b + >>>>> zero() = b` >>>>> The your case `b + null = null` breaks this rule. >>>>> This is the same with v1.6.1. >>>>> See: >>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57 >>>>> >>>>> // maropu >>>>> >>>>> >>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sometimes, the BUF for the aggregator may depend on the actual >>>>>> input.. and while this passes the responsibility to handle null in >>>>>> merge/reduce to the developer, it sounds fine to me if he is the one who >>>>>> put null in zero() anyway. >>>>>> Now, it seems that the aggregation is skipped entirely when zero() = >>>>>> null. Not sure if that was the behaviour in 1.6 >>>>>> >>>>>> Is this behaviour wanted ? >>>>>> >>>>>> Thanks, >>>>>> Amit >>>>>> >>>>>> Aggregator example: >>>>>> >>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, >>>>>> Integer, Integer> { >>>>>> >>>>>> @Override >>>>>> public Integer zero() { >>>>>> return null; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Integer reduce(Integer b, Tuple2<String, Integer> a) { >>>>>> if (b == null) { >>>>>> b = 0; >>>>>> } >>>>>> return b + a._2(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Integer merge(Integer b1, Integer b2) { >>>>>> if (b1 == null) { >>>>>> return b2; >>>>>> } else if (b2 == null) { >>>>>> return b1; >>>>>> } else { >>>>>> return b1 + b2; >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> --- >>>>> Takeshi Yamamuro >>>>> >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> > > > -- > --- > Takeshi Yamamuro >