I found a bug in aggregator.
In parseMessages, you calls masterAggregation() method. Do you think
everything is OK?
/**
* Method to let the custom master aggregator read messages from peers and
* aggregate a value.
*/
@SuppressWarnings("unchecked")
public void masterAggregation(Text name, Writable value) {
String nameIdx = name.toString().split(";", 2)[1];
this.Aggregators.get(nameIdx).aggregate(null, value);
// When it's time to send the values, we can see which aggregators are used.
this.aggregatorsUsed.add(nameIdx);
}
The aggregated value will be always the last value.
Like our old code,
getAggregationRunner().aggregateVertex(lastValue, vertex);
You should aggregates all values.
--
Best Regards, Edward J. Yoon
@eddieyoon