Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-07-01 Thread Koert Kuipers
valid functions can be written for reduce and merge when the zero is null.
so not being able to provide null as the initial value is something
troublesome.

i guess the proper way to do this is use Option, and have the None be the
zero, which is what i assumed you did?
unfortunately last time i tried using scala Options with spark Aggregators
it didnt work quite well. see:
https://issues.apache.org/jira/browse/SPARK-15810

lifting a semigroup into a monoid like this using Option is fairly typical,
so either null or None has to work or else this api will be somewhat
unpleasant to use for anything practical.

for an example of this lifting on a related Aggregator class:
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala#L420

it would be nice to provide a similar convenience method for spark's
Aggregator. basically if the user provides no zero the output is
Option[OUT] instead of OUT, which spark translates into OUT being nullable.​


On Fri, Jul 1, 2016 at 5:04 PM, Amit Sela  wrote:

> Thanks for pointing that Koert!
>
> I understand now why zero() and not init(a: IN), though I still don't see
> a good reason to skip the aggregation if zero returns null.
> If the user did it, it's on him to take care of null cases in
> reduce/merge, but it opens-up the possibility to use the input to create
> the buffer for the aggregator.
> Wouldn't that at least enable the functionality discussed in SPARK-15598 ?
> without changing how the Aggregator works.
>
> I bypassed it by using Optional (Guava) because I'm using the Java API,
> but it's a bit cumbersome...
>
> Thanks,
> Amit
>
> On Thu, Jun 30, 2016 at 1:54 AM Koert Kuipers  wrote:
>
>> its the difference between a semigroup and a monoid, and yes max does not
>> easily fit into a monoid.
>>
>> see also discussion here:
>> https://issues.apache.org/jira/browse/SPARK-15598
>>
>> On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela  wrote:
>>
>>> OK. I see that, but the current (provided) implementations are very
>>> naive - Sum, Count, Average -let's take Max for example: I guess zero()
>>> would be set to some value like Long.MIN_VALUE, but what if you trigger (I
>>> assume in the future Spark streaming will support time-based triggers) for
>>> a result and there are no events ?
>>>
>>> And like I said, for a more general use case: What if my zero() function
>>> depends on my input ?
>>>
>>> I just don't see the benefit of this behaviour, though I realise this is
>>> the implementation.
>>>
>>> Thanks,
>>> Amit
>>>
>>> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro 
>>> wrote:
>>>
 No, TypedAggregateExpression that uses Aggregator#zero is different
 between v2.0 and v1.6.
 v2.0:
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
 v1.6:
 https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115

 // maropu


 On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela 
 wrote:

> This "if (value == null)" condition you point to exists in 1.6 branch
> as well, so that's probably not the reason.
>
> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <
> linguin@gmail.com> wrote:
>
>> Whatever it is, this is expected; if an initial value is null, spark
>> codegen removes all the aggregates.
>> See:
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>
>> // maropu
>>
>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela 
>> wrote:
>>
>>> Not sure about what's the rule in case of `b + null = null` but the
>>> same code works perfectly in 1.6.1, just tried it..
>>>
>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <
>>> linguin@gmail.com> wrote:
>>>
 Hi,

 This behaviour seems to be expected because you must ensure `b +
 zero() = b`
 The your case `b + null = null` breaks this rule.
 This is the same with v1.6.1.
 See:
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

 // maropu


 On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela 
 wrote:

> Sometimes, the BUF for the aggregator may depend on the actual
> input.. and while this passes the responsibility to handle null in
> merge/reduce to the developer, it sounds fine to me if he is the one 
> who
> put null in zero() anyway.
> Now, it seems that the aggregation is skipped entirely when
> 

Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-07-01 Thread Amit Sela
Thanks for pointing that Koert!

I understand now why zero() and not init(a: IN), though I still don't see a
good reason to skip the aggregation if zero returns null.
If the user did it, it's on him to take care of null cases in reduce/merge,
but it opens-up the possibility to use the input to create the buffer for
the aggregator.
Wouldn't that at least enable the functionality discussed in SPARK-15598 ?
without changing how the Aggregator works.

I bypassed it by using Optional (Guava) because I'm using the Java API, but
it's a bit cumbersome...

Thanks,
Amit

On Thu, Jun 30, 2016 at 1:54 AM Koert Kuipers  wrote:

> its the difference between a semigroup and a monoid, and yes max does not
> easily fit into a monoid.
>
> see also discussion here:
> https://issues.apache.org/jira/browse/SPARK-15598
>
> On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela  wrote:
>
>> OK. I see that, but the current (provided) implementations are very naive
>> - Sum, Count, Average -let's take Max for example: I guess zero() would be
>> set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
>> the future Spark streaming will support time-based triggers) for a result
>> and there are no events ?
>>
>> And like I said, for a more general use case: What if my zero() function
>> depends on my input ?
>>
>> I just don't see the benefit of this behaviour, though I realise this is
>> the implementation.
>>
>> Thanks,
>> Amit
>>
>> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro 
>> wrote:
>>
>>> No, TypedAggregateExpression that uses Aggregator#zero is different
>>> between v2.0 and v1.6.
>>> v2.0:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
>>> v1.6:
>>> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela  wrote:
>>>
 This "if (value == null)" condition you point to exists in 1.6 branch
 as well, so that's probably not the reason.

 On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro 
 wrote:

> Whatever it is, this is expected; if an initial value is null, spark
> codegen removes all the aggregates.
> See:
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>
> // maropu
>
> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela 
> wrote:
>
>> Not sure about what's the rule in case of `b + null = null` but the
>> same code works perfectly in 1.6.1, just tried it..
>>
>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <
>> linguin@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This behaviour seems to be expected because you must ensure `b +
>>> zero() = b`
>>> The your case `b + null = null` breaks this rule.
>>> This is the same with v1.6.1.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela 
>>> wrote:
>>>
 Sometimes, the BUF for the aggregator may depend on the actual
 input.. and while this passes the responsibility to handle null in
 merge/reduce to the developer, it sounds fine to me if he is the one 
 who
 put null in zero() anyway.
 Now, it seems that the aggregation is skipped entirely when zero()
 = null. Not sure if that was the behaviour in 1.6

 Is this behaviour wanted ?

 Thanks,
 Amit

 Aggregator example:

 public static class Agg extends Aggregator, 
 Integer, Integer> {

   @Override
   public Integer zero() {
 return null;
   }

   @Override
   public Integer reduce(Integer b, Tuple2 a) {
 if (b == null) {
   b = 0;
 }
 return b + a._2();
   }

   @Override
   public Integer merge(Integer b1, Integer b2) {
 if (b1 == null) {
   return b2;
 } else if (b2 == null) {
   return b1;
 } else {
   return b1 + b2;
 }
   }


>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-29 Thread Koert Kuipers
its the difference between a semigroup and a monoid, and yes max does not
easily fit into a monoid.

see also discussion here:
https://issues.apache.org/jira/browse/SPARK-15598

On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela  wrote:

> OK. I see that, but the current (provided) implementations are very naive
> - Sum, Count, Average -let's take Max for example: I guess zero() would be
> set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
> the future Spark streaming will support time-based triggers) for a result
> and there are no events ?
>
> And like I said, for a more general use case: What if my zero() function
> depends on my input ?
>
> I just don't see the benefit of this behaviour, though I realise this is
> the implementation.
>
> Thanks,
> Amit
>
> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro 
> wrote:
>
>> No, TypedAggregateExpression that uses Aggregator#zero is different
>> between v2.0 and v1.6.
>> v2.0:
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
>> v1.6:
>> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>>
>> // maropu
>>
>>
>> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela  wrote:
>>
>>> This "if (value == null)" condition you point to exists in 1.6 branch as
>>> well, so that's probably not the reason.
>>>
>>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro 
>>> wrote:
>>>
 Whatever it is, this is expected; if an initial value is null, spark
 codegen removes all the aggregates.
 See:
 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199

 // maropu

 On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela 
 wrote:

> Not sure about what's the rule in case of `b + null = null` but the
> same code works perfectly in 1.6.1, just tried it..
>
> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <
> linguin@gmail.com> wrote:
>
>> Hi,
>>
>> This behaviour seems to be expected because you must ensure `b +
>> zero() = b`
>> The your case `b + null = null` breaks this rule.
>> This is the same with v1.6.1.
>> See:
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>
>> // maropu
>>
>>
>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela 
>> wrote:
>>
>>> Sometimes, the BUF for the aggregator may depend on the actual
>>> input.. and while this passes the responsibility to handle null in
>>> merge/reduce to the developer, it sounds fine to me if he is the one who
>>> put null in zero() anyway.
>>> Now, it seems that the aggregation is skipped entirely when zero()
>>> = null. Not sure if that was the behaviour in 1.6
>>>
>>> Is this behaviour wanted ?
>>>
>>> Thanks,
>>> Amit
>>>
>>> Aggregator example:
>>>
>>> public static class Agg extends Aggregator, 
>>> Integer, Integer> {
>>>
>>>   @Override
>>>   public Integer zero() {
>>> return null;
>>>   }
>>>
>>>   @Override
>>>   public Integer reduce(Integer b, Tuple2 a) {
>>> if (b == null) {
>>>   b = 0;
>>> }
>>> return b + a._2();
>>>   }
>>>
>>>   @Override
>>>   public Integer merge(Integer b1, Integer b2) {
>>> if (b1 == null) {
>>>   return b2;
>>> } else if (b2 == null) {
>>>   return b1;
>>> } else {
>>>   return b1 + b2;
>>> }
>>>   }
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-27 Thread Amit Sela
OK. I see that, but the current (provided) implementations are very naive -
Sum, Count, Average -let's take Max for example: I guess zero() would be
set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
the future Spark streaming will support time-based triggers) for a result
and there are no events ?

And like I said, for a more general use case: What if my zero() function
depends on my input ?

I just don't see the benefit of this behaviour, though I realise this is
the implementation.

Thanks,
Amit

On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro 
wrote:

> No, TypedAggregateExpression that uses Aggregator#zero is different
> between v2.0 and v1.6.
> v2.0:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
> v1.6:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela  wrote:
>
>> This "if (value == null)" condition you point to exists in 1.6 branch as
>> well, so that's probably not the reason.
>>
>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro 
>> wrote:
>>
>>> Whatever it is, this is expected; if an initial value is null, spark
>>> codegen removes all the aggregates.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>>
>>> // maropu
>>>
>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela  wrote:
>>>
 Not sure about what's the rule in case of `b + null = null` but the
 same code works perfectly in 1.6.1, just tried it..

 On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro 
 wrote:

> Hi,
>
> This behaviour seems to be expected because you must ensure `b +
> zero() = b`
> The your case `b + null = null` breaks this rule.
> This is the same with v1.6.1.
> See:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela 
> wrote:
>
>> Sometimes, the BUF for the aggregator may depend on the actual
>> input.. and while this passes the responsibility to handle null in
>> merge/reduce to the developer, it sounds fine to me if he is the one who
>> put null in zero() anyway.
>> Now, it seems that the aggregation is skipped entirely when zero() =
>> null. Not sure if that was the behaviour in 1.6
>>
>> Is this behaviour wanted ?
>>
>> Thanks,
>> Amit
>>
>> Aggregator example:
>>
>> public static class Agg extends Aggregator, 
>> Integer, Integer> {
>>
>>   @Override
>>   public Integer zero() {
>> return null;
>>   }
>>
>>   @Override
>>   public Integer reduce(Integer b, Tuple2 a) {
>> if (b == null) {
>>   b = 0;
>> }
>> return b + a._2();
>>   }
>>
>>   @Override
>>   public Integer merge(Integer b1, Integer b2) {
>> if (b1 == null) {
>>   return b2;
>> } else if (b2 == null) {
>>   return b1;
>> } else {
>>   return b1 + b2;
>> }
>>   }
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
No, TypedAggregateExpression that uses Aggregator#zero is different between
v2.0 and v1.6.
v2.0:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
v1.6:
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115

// maropu


On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela  wrote:

> This "if (value == null)" condition you point to exists in 1.6 branch as
> well, so that's probably not the reason.
>
> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro 
> wrote:
>
>> Whatever it is, this is expected; if an initial value is null, spark
>> codegen removes all the aggregates.
>> See:
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>
>> // maropu
>>
>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela  wrote:
>>
>>> Not sure about what's the rule in case of `b + null = null` but the same
>>> code works perfectly in 1.6.1, just tried it..
>>>
>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro 
>>> wrote:
>>>
 Hi,

 This behaviour seems to be expected because you must ensure `b + zero()
 = b`
 The your case `b + null = null` breaks this rule.
 This is the same with v1.6.1.
 See:
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

 // maropu


 On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela 
 wrote:

> Sometimes, the BUF for the aggregator may depend on the actual input..
> and while this passes the responsibility to handle null in merge/reduce to
> the developer, it sounds fine to me if he is the one who put null in 
> zero()
> anyway.
> Now, it seems that the aggregation is skipped entirely when zero() =
> null. Not sure if that was the behaviour in 1.6
>
> Is this behaviour wanted ?
>
> Thanks,
> Amit
>
> Aggregator example:
>
> public static class Agg extends Aggregator, 
> Integer, Integer> {
>
>   @Override
>   public Integer zero() {
> return null;
>   }
>
>   @Override
>   public Integer reduce(Integer b, Tuple2 a) {
> if (b == null) {
>   b = 0;
> }
> return b + a._2();
>   }
>
>   @Override
>   public Integer merge(Integer b1, Integer b2) {
> if (b1 == null) {
>   return b2;
> } else if (b2 == null) {
>   return b1;
> } else {
>   return b1 + b2;
> }
>   }
>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


-- 
---
Takeshi Yamamuro


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
This "if (value == null)" condition you point to exists in 1.6 branch as
well, so that's probably not the reason.

On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro 
wrote:

> Whatever it is, this is expected; if an initial value is null, spark
> codegen removes all the aggregates.
> See:
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>
> // maropu
>
> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela  wrote:
>
>> Not sure about what's the rule in case of `b + null = null` but the same
>> code works perfectly in 1.6.1, just tried it..
>>
>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> This behaviour seems to be expected because you must ensure `b + zero()
>>> = b`
>>> The your case `b + null = null` breaks this rule.
>>> This is the same with v1.6.1.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela  wrote:
>>>
 Sometimes, the BUF for the aggregator may depend on the actual input..
 and while this passes the responsibility to handle null in merge/reduce to
 the developer, it sounds fine to me if he is the one who put null in zero()
 anyway.
 Now, it seems that the aggregation is skipped entirely when zero() =
 null. Not sure if that was the behaviour in 1.6

 Is this behaviour wanted ?

 Thanks,
 Amit

 Aggregator example:

 public static class Agg extends Aggregator, 
 Integer, Integer> {

   @Override
   public Integer zero() {
 return null;
   }

   @Override
   public Integer reduce(Integer b, Tuple2 a) {
 if (b == null) {
   b = 0;
 }
 return b + a._2();
   }

   @Override
   public Integer merge(Integer b1, Integer b2) {
 if (b1 == null) {
   return b2;
 } else if (b2 == null) {
   return b1;
 } else {
   return b1 + b2;
 }
   }


>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
Whatever it is, this is expected; if an initial value is null, spark
codegen removes all the aggregates.
See:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199

// maropu

On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela  wrote:

> Not sure about what's the rule in case of `b + null = null` but the same
> code works perfectly in 1.6.1, just tried it..
>
> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> This behaviour seems to be expected because you must ensure `b + zero() =
>> b`
>> The your case `b + null = null` breaks this rule.
>> This is the same with v1.6.1.
>> See:
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>
>> // maropu
>>
>>
>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela  wrote:
>>
>>> Sometimes, the BUF for the aggregator may depend on the actual input..
>>> and while this passes the responsibility to handle null in merge/reduce to
>>> the developer, it sounds fine to me if he is the one who put null in zero()
>>> anyway.
>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>> null. Not sure if that was the behaviour in 1.6
>>>
>>> Is this behaviour wanted ?
>>>
>>> Thanks,
>>> Amit
>>>
>>> Aggregator example:
>>>
>>> public static class Agg extends Aggregator, 
>>> Integer, Integer> {
>>>
>>>   @Override
>>>   public Integer zero() {
>>> return null;
>>>   }
>>>
>>>   @Override
>>>   public Integer reduce(Integer b, Tuple2 a) {
>>> if (b == null) {
>>>   b = 0;
>>> }
>>> return b + a._2();
>>>   }
>>>
>>>   @Override
>>>   public Integer merge(Integer b1, Integer b2) {
>>> if (b1 == null) {
>>>   return b2;
>>> } else if (b2 == null) {
>>>   return b1;
>>> } else {
>>>   return b1 + b2;
>>> }
>>>   }
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


-- 
---
Takeshi Yamamuro


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
Not sure about what's the rule in case of `b + null = null` but the same
code works perfectly in 1.6.1, just tried it..

On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro 
wrote:

> Hi,
>
> This behaviour seems to be expected because you must ensure `b + zero() =
> b`
> The your case `b + null = null` breaks this rule.
> This is the same with v1.6.1.
> See:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela  wrote:
>
>> Sometimes, the BUF for the aggregator may depend on the actual input..
>> and while this passes the responsibility to handle null in merge/reduce to
>> the developer, it sounds fine to me if he is the one who put null in zero()
>> anyway.
>> Now, it seems that the aggregation is skipped entirely when zero() =
>> null. Not sure if that was the behaviour in 1.6
>>
>> Is this behaviour wanted ?
>>
>> Thanks,
>> Amit
>>
>> Aggregator example:
>>
>> public static class Agg extends Aggregator, Integer, 
>> Integer> {
>>
>>   @Override
>>   public Integer zero() {
>> return null;
>>   }
>>
>>   @Override
>>   public Integer reduce(Integer b, Tuple2 a) {
>> if (b == null) {
>>   b = 0;
>> }
>> return b + a._2();
>>   }
>>
>>   @Override
>>   public Integer merge(Integer b1, Integer b2) {
>> if (b1 == null) {
>>   return b2;
>> } else if (b2 == null) {
>>   return b1;
>> } else {
>>   return b1 + b2;
>> }
>>   }
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
Hi,

This behaviour seems to be expected because you must ensure `b + zero() = b`
The your case `b + null = null` breaks this rule.
This is the same with v1.6.1.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

// maropu


On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela  wrote:

> Sometimes, the BUF for the aggregator may depend on the actual input.. and
> while this passes the responsibility to handle null in merge/reduce to the
> developer, it sounds fine to me if he is the one who put null in zero()
> anyway.
> Now, it seems that the aggregation is skipped entirely when zero() =
> null. Not sure if that was the behaviour in 1.6
>
> Is this behaviour wanted ?
>
> Thanks,
> Amit
>
> Aggregator example:
>
> public static class Agg extends Aggregator, Integer, 
> Integer> {
>
>   @Override
>   public Integer zero() {
> return null;
>   }
>
>   @Override
>   public Integer reduce(Integer b, Tuple2 a) {
> if (b == null) {
>   b = 0;
> }
> return b + a._2();
>   }
>
>   @Override
>   public Integer merge(Integer b1, Integer b2) {
> if (b1 == null) {
>   return b2;
> } else if (b2 == null) {
>   return b1;
> } else {
>   return b1 + b2;
> }
>   }
>
>


-- 
---
Takeshi Yamamuro


Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
Sometimes, the BUF for the aggregator may depend on the actual input.. and
while this passes the responsibility to handle null in merge/reduce to the
developer, it sounds fine to me if he is the one who put null in zero()
anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null.
Not sure if that was the behaviour in 1.6

Is this behaviour wanted ?

Thanks,
Amit

Aggregator example:

public static class Agg extends Aggregator,
Integer, Integer> {

  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
if (b == null) {
  b = 0;
}
return b + a._2();
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null) {
  return b1;
} else {
  return b1 + b2;
}
  }