Sometimes, the BUF for the aggregator may depend on the actual input.. and
while this passes the responsibility to handle null in merge/reduce to the
developer, it sounds fine to me if he is the one who put null in zero()
anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null.
Not sure if that was the behaviour in 1.6

Is this behaviour wanted ?

Thanks,
Amit

Aggregator example:

public static class Agg extends Aggregator<Tuple2<String, Integer>,
Integer, Integer> {

  @Override
  public Integer zero() {
    return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2<String, Integer> a) {
    if (b == null) {
      b = 0;
    }
    return b + a._2();
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
    if (b1 == null) {
      return b2;
    } else if (b2 == null) {
      return b1;
    } else {
      return b1 + b2;
    }
  }

Reply via email to