I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator 
with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous 
value to it and stores the result as the new vertex value; the result is also 
stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on 
the 10th superstep

The worker context seems to serve no other purpose but to hold the value of 
FINAL_SUM (not related to the aggregator) at each iteration.  It also seems 
like the aggregator is registered in the initialize method of the MasterCompute 
object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is 
not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what 
else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override 
the preApplication() method in WorkerContext to register the aggregator and 
then override the preSuperstep() method to to tell the workers to use the 
aggregator (the useAggregator() method). Check the MasterCompute and 
WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West 
<nick.w...@benchmarksolutions.com<mailto:nick.w...@benchmarksolutions.com>> 
wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute 
object (definition following) all of the map tasks time out. I have hadoop 
configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I 
then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, 
PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step 
that I'm missing?  Any suggestions as to why/how these additions are causing 
the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile 
+1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]





<<inline: image001.png>>

Reply via email to