Sometimes, the BUF for the aggregator may depend on the actual input.. and while this passes the responsibility to handle null in merge/reduce to the developer, it sounds fine to me if he is the one who put null in zero() anyway. Now, it seems that the aggregation is skipped entirely when zero() = null. Not sure if that was the behaviour in 1.6
Is this behaviour wanted ? Thanks, Amit Aggregator example: public static class Agg extends Aggregator<Tuple2<String, Integer>, Integer, Integer> { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2<String, Integer> a) { if (b == null) { b = 0; } return b + a._2(); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null) { return b1; } else { return b1 + b2; } }