Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached
SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that
I am using. It is approx. 1 month old. It seems that the WorkerContext
class is different from the current svn version. I am not aware if this
change made in the current version to reflect some change in the API
behaviour, but I followed the WorkerContext definition from the attached
file and my code worked.)

You will see that you need to register the aggregator twice - in the
initialize() method of MasterCompute (which you have done) and in the
preApplication() method of the WorkerContext. Moreover in the
preSuperstep() method of the WorkerContext, you need to call
useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a
try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <nick.w...@benchmarksolutions.com
> wrote:

>  I'm a little confused by the examples in SimpleMasterComputeVertex.java.
>
>
>  To me it looks like this is a simple example with one vertex and one
> aggregator with the following behavior:
> - The vertex gets the value stored in the aggregator and then adds its
> previous value to it and stores the result as the new vertex value; the
> result is also stored in the worker context
> - The aggregator sets its value to superstep/2 + 1 every iteration and
> stops on the 10th superstep
>
>  The worker context seems to serve no other purpose but to hold the value
> of FINAL_SUM (not related to the aggregator) at each iteration.  It also
> seems like the aggregator is registered in the initialize method of the
> MasterCompute object, much like I have in my code.
>
>  I see one difference between the example and my code:
>    1) I use the aggregate function in each vertex's compute method.  If
> this is not the way to have the vertices combine values, what is?
>
>  If you can provide insight to either how I'm not following the example,
> or what else might wrong, that'd be great.
>
>  Thanks,
> Nick
>
>
>  On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:
>
> Hi Nick,
>
>  Are you using WorkerContext to register the aggregator? You need to
> override the preApplication() method in WorkerContext to register the
> aggregator and then override the preSuperstep() method to to tell the
> workers to use the aggregator (the useAggregator() method). Check the
> MasterCompute and WorkerContext examples in Giraph.
>
>  Regards,
> Kaushik
>
> On Mon, Aug 20, 2012 at 1:26 PM, Nick West <
> nick.w...@benchmarksolutions.com> wrote:
>
>>  Hi,
>>
>>  I have a giraph application that runs fine; however, when I add a
>> MasterCompute object (definition following) all of the map tasks time out.
>> I have hadoop configured to run with 8 map processes and giraph to use one
>> worker.
>>
>>  Here's the definition of the MasterCompute object:
>>
>>  class BPMasterComputer extends MasterCompute{
>>   override def compute() {
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>     val res = agg.getAggregatedValue.get
>>     if (res) haltComputation
>>     agg.setAggregatedValue(true)
>>   }
>>   override def initialize() {
>>     registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>     agg.setAggregatedValue(true)
>>   }
>>   override def write(out: DataOutput) {}
>>   override def readFields(in: DataInput) {}
>> }
>>
>>  (as far as I can tell, there is no state that needs to be
>> read/written.)  I then register this class as the MasterCompute class in
>> the giraph job:
>>
>>  job.setMasterComputeClass(classOf[BPMasterComputer])
>>
>>  and then use the aggregator in the compute method of my vertices:
>>
>>  class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text,
>> PackagedMessage] with Loggable {
>>    override def compute(msgs: java.util.Iterator[PackagedMessage]) {
>>     ...
>>     var stop = false
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>      ... code to modify stop and vote to halt ...
>>      agg.aggregate(stop)
>>   }
>> }
>>
>>  Is there some other method that I am not calling that I should?  Or
>> some step that I'm missing?  Any suggestions as to why/how these additions
>> are causing the processes to block would be appreciated!
>>
>>  Thanks,
>> *Nick West
>> **
>> *Benchmark Solutions
>> 101 Park Avenue - 7th Floor
>> New York, NY 10178
>> Tel +1.212.220.4739 | Mobile +1.646.267.4324
>> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
>> **
>> *<image001.png>
>>
>>
>>
>>    *
>> **
>>
>
>
> *
> Nick West
> **
> *Benchmark Solutions
> 101 Park Avenue - 7th Floor
> New York, NY 10178
> Tel +1.212.220.4739 | Mobile +1.646.267.4324
> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
> ***
>
>
>
>    *
> **
>

<<image001.png>>

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.giraph.examples;

import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.MasterCompute;
import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

/**
 * Demonstrates a computation with a centralized part implemented via a
 * MasterCompute.
 */
public class SimpleMasterComputeVertex extends LongDoubleFloatDoubleVertex {
  /** Aggregator to get values from the master to the workers */
  public static final String SMC_AGG = "simplemastercompute.aggregator";
  /** Logger */
  private static final Logger LOG =
      Logger.getLogger(SimpleMasterComputeVertex.class);

  @Override
  public void compute(Iterator<DoubleWritable> msgIterator) {
    DoubleOverwriteAggregator agg =
        (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
    double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get();
    double newValue = agg.getAggregatedValue().get();
    double newSum = oldSum + newValue;
    setVertexValue(new DoubleWritable(newSum));
    SimpleMasterComputeWorkerContext workerContext =
        (SimpleMasterComputeWorkerContext) getWorkerContext();
    workerContext.setFinalSum(newSum);
    LOG.info("Current sum: " + newSum);
  }

  /**
   * Worker context used with {@link SimpleMasterComputeVertex}.
   */
  public static class SimpleMasterComputeWorkerContext
      extends WorkerContext {
    /** Final sum value for verification for local jobs */
    private static double FINAL_SUM;

    @Override
    public void preApplication()
      throws InstantiationException, IllegalAccessException {
      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
    }

    @Override
    public void preSuperstep() {
      useAggregator(SMC_AGG);
    }

    @Override
    public void postSuperstep() {
    }

    @Override
    public void postApplication() {
    }

    public void setFinalSum(double sum) {
      FINAL_SUM = sum;
    }

    public static double getFinalSum() {
      return FINAL_SUM;
    }
  }

  /**
   * MasterCompute used with {@link SimpleMasterComputeVertex}.
   */
  public static class SimpleMasterCompute
      extends MasterCompute {
    @Override
    public void write(DataOutput out) throws IOException {
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    }

    @Override
    public void compute() {
      DoubleOverwriteAggregator agg =
          (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
      agg.aggregate(((double) getSuperstep()) / 2 + 1);
      if (getSuperstep() == 10) {
        haltComputation();
      }
    }

    @Override
    public void initialize() throws InstantiationException,
        IllegalAccessException {
      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
    }
  }
}

Reply via email to