Thanks for pointing that Koert! I understand now why zero() and not init(a: IN), though I still don't see a good reason to skip the aggregation if zero returns null. If the user did it, it's on him to take care of null cases in reduce/merge, but it opens-up the possibility to use the input to create the buffer for the aggregator. Wouldn't that at least enable the functionality discussed in SPARK-15598 ? without changing how the Aggregator works.
I bypassed it by using Optional (Guava) because I'm using the Java API, but it's a bit cumbersome... Thanks, Amit On Thu, Jun 30, 2016 at 1:54 AM Koert Kuipers <ko...@tresata.com> wrote: > its the difference between a semigroup and a monoid, and yes max does not > easily fit into a monoid. > > see also discussion here: > https://issues.apache.org/jira/browse/SPARK-15598 > > On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela <amitsel...@gmail.com> wrote: > >> OK. I see that, but the current (provided) implementations are very naive >> - Sum, Count, Average -let's take Max for example: I guess zero() would be >> set to some value like Long.MIN_VALUE, but what if you trigger (I assume in >> the future Spark streaming will support time-based triggers) for a result >> and there are no events ? >> >> And like I said, for a more general use case: What if my zero() function >> depends on my input ? >> >> I just don't see the benefit of this behaviour, though I realise this is >> the implementation. >> >> Thanks, >> Amit >> >> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin....@gmail.com> >> wrote: >> >>> No, TypedAggregateExpression that uses Aggregator#zero is different >>> between v2.0 and v1.6. >>> v2.0: >>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91 >>> v1.6: >>> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115 >>> >>> // maropu >>> >>> >>> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote: >>> >>>> This "if (value == null)" condition you point to exists in 1.6 branch >>>> as well, so that's probably not the reason. >>>> >>>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin....@gmail.com> >>>> wrote: >>>> >>>>> Whatever it is, this is expected; if an initial value is null, spark >>>>> codegen removes all the aggregates. >>>>> See: >>>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199 >>>>> >>>>> // maropu >>>>> >>>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> >>>>> wrote: >>>>> >>>>>> Not sure about what's the rule in case of `b + null = null` but the >>>>>> same code works perfectly in 1.6.1, just tried it.. >>>>>> >>>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro < >>>>>> linguin....@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> This behaviour seems to be expected because you must ensure `b + >>>>>>> zero() = b` >>>>>>> The your case `b + null = null` breaks this rule. >>>>>>> This is the same with v1.6.1. >>>>>>> See: >>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57 >>>>>>> >>>>>>> // maropu >>>>>>> >>>>>>> >>>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Sometimes, the BUF for the aggregator may depend on the actual >>>>>>>> input.. and while this passes the responsibility to handle null in >>>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one >>>>>>>> who >>>>>>>> put null in zero() anyway. >>>>>>>> Now, it seems that the aggregation is skipped entirely when zero() >>>>>>>> = null. Not sure if that was the behaviour in 1.6 >>>>>>>> >>>>>>>> Is this behaviour wanted ? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Amit >>>>>>>> >>>>>>>> Aggregator example: >>>>>>>> >>>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, >>>>>>>> Integer, Integer> { >>>>>>>> >>>>>>>> @Override >>>>>>>> public Integer zero() { >>>>>>>> return null; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public Integer reduce(Integer b, Tuple2<String, Integer> a) { >>>>>>>> if (b == null) { >>>>>>>> b = 0; >>>>>>>> } >>>>>>>> return b + a._2(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public Integer merge(Integer b1, Integer b2) { >>>>>>>> if (b1 == null) { >>>>>>>> return b2; >>>>>>>> } else if (b2 == null) { >>>>>>>> return b1; >>>>>>>> } else { >>>>>>>> return b1 + b2; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> --- >>>>>>> Takeshi Yamamuro >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> --- >>>>> Takeshi Yamamuro >>>>> >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> >