Sometimes, the BUF for the aggregator may depend on the actual input.. and
while this passes the responsibility to handle null in merge/reduce to the
developer, it sounds fine to me if he is the one who put null in zero()
Now, it seems that the aggregation is skipped entirely when zero() = null.
Not sure if that was the behaviour in 1.6

Is this behaviour wanted ?


Aggregator example:

public static class Agg extends Aggregator<Tuple2<String, Integer>,
Integer, Integer> {

  public Integer zero() {
    return null;

  public Integer reduce(Integer b, Tuple2<String, Integer> a) {
    if (b == null) {
      b = 0;
    return b + a._2();

  public Integer merge(Integer b1, Integer b2) {
    if (b1 == null) {
      return b2;
    } else if (b2 == null) {
      return b1;
    } else {
      return b1 + b2;

Reply via email to