Sometimes, the BUF for the aggregator may depend on the actual input.. and
while this passes the responsibility to handle null in merge/reduce to the
developer, it sounds fine to me if he is the one who put null in zero()
anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null.
Not sure if that was the behaviour in 1.6
Is this behaviour wanted ?
Thanks,
Amit
Aggregator example:
public static class Agg extends Aggregator<Tuple2<String, Integer>,
Integer, Integer> {
@Override
public Integer zero() {
return null;
}
@Override
public Integer reduce(Integer b, Tuple2<String, Integer> a) {
if (b == null) {
b = 0;
}
return b + a._2();
}
@Override
public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
return b2;
} else if (b2 == null) {
return b1;
} else {
return b1 + b2;
}
}