Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute 
object (definition following) all of the map tasks time out. I have hadoop 
configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I 
then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, 
PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = 
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step 
that I'm missing?  Any suggestions as to why/how these additions are causing 
the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]





<<inline: image001.png>>

Reply via email to