---------- Forwarded message ---------- From: Jyoti Yadav <rao.jyoti26ya...@gmail.com> Date: Fri, Jan 10, 2014 at 11:57 AM Subject: Writing my own aggregator.. To: ameya.vilan...@gmail.com
Hi Ameya.. I am badly stuck while implementing my custom aggregator.. In my program i want to send each vertex id to master. For that i took an arraylist, in which each vertex is adding its own id.while running the program,each vertex calls aggregate() function..As per my observation it is working fine in vertex compute method.But while retrieving back in master compute function.arraylist is not reflected back to master compute function. I am attaching two files below..You are requested to please check it once.. *1.MyArrayWritable.java* package org.apache.giraph.examples.utils; import java.io.*; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparator; import java.util.Arrays; import java.util.*; public class MyArrayWritable implements Writable { private long item; private ArrayList<Long> arraylist=new ArrayList<Long>(5); public MyArrayWritable() { item=0; //arraylist=new ArrayList<Long>(5); arraylist.add(item); } public MyArrayWritable(long item1) { item=item1; //arraylist=new ArrayList<Long>(5); arraylist.add(item); } public ArrayList<Long> get_arraylist() { return arraylist; } public void set_arraylist(ArrayList al) { //this.arraylist=new ArrayList<Long>(al); this.arraylist=al; } public long get_item(){return item;} @Override public void readFields(DataInput in) throws IOException { item=in.readLong(); int size=arraylist.size(); size=in.readInt(); arraylist=new ArrayList<Long>(5); for(int i=0;i<size;i++) { arraylist.add(in.readLong()); } } @Override public void write(DataOutput out) throws IOException { out.writeLong(item); out.writeInt(arraylist.size()); for(int i=0;i<arraylist.size();i++) { out.writeLong(arraylist.get(i)); } } @Override public String toString() { return "output is "+ Long.toString(item) + "\n"; } } 2.MyArrayAggregator.java package org.apache.giraph.examples.utils; import org.apache.giraph.aggregators.BasicAggregator; import java.util.*; public class MyArrayAggregator extends BasicAggregator<MyArrayWritable> { @Override public void aggregate(MyArrayWritable value) { ArrayList<Long> al=new ArrayList<Long>(); (getAggregatedValue().get_arraylist()).add(value.get_item()); al=getAggregatedValue().get_arraylist(); getAggregatedValue().set_arraylist(al); } @Override public MyArrayWritable createInitialValue() { return new MyArrayWritable(); } } Thanks in advance ... Jyoti