Hi Paolo,

You get null for aggregated value because aggregators haven't been
registered yet in the moment WorkerContext.preApllication() is called. But
I think that shouldn't be a problem since you can set initial values for
aggregators in MasterCompute.initialize().

Please also note that you are not using the new aggregator api in the
proper way. Function getAggregatedValue will return the value of the
aggregator, not the aggregator object itself. It's not possible to set the
value of the aggregators on workers (in methods from WorkerContext and
Vertex), because that would produce nondeterministic results. You
aggregate on workers and set values on master.

As for persistent vs regular aggregator, value of regular aggregator is
being reset before each superstep, while the persistent isn't. For
example, if you have a persistent sum aggregator its value is going to be
the sum of all values given to it from the beginning of application. If
you have regular sum aggregator the value is going to be just the sum of
values from previous superstep.

I can write a small tutorial about aggregators if someone can tell me
where and how to do that. :-) I see that for people who were using
aggregators before these changes will be confusing, but I think that for
the ones who are starting with current state it will be much easier.

Maja

On 9/11/12 9:49 AM, "Paolo Castagna" <castagna.li...@gmail.com> wrote:

>Hi,
>this is how I run the PageRank implementation (mine takes into account
>dangling nodes and checks for convergence):
>
>Map<String,String> params = new HashMap<String,String>();
>params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute");
>
>String[] data = getData ( filename );
>Iterable<String> results = InternalVertexRunner.run(
>  PageRankVertex.class,
>  PageRankVertexInputFormat.class,
>  PageRankVertexOutputFormat.class,
>  params,
>  data
>);
>
>This used to work, however I was registering aggregators in
>PageRankVertexWorkerContext (see below).
>
>Now, I am trying to do the same in PageRankVertexMasterCompute which
>extends DefaultMasterCompute and has only one method:
>
>@Override
>public void initialize() throws InstantiationException,
>IllegalAccessException {
>  log.debug("initialize");
>  registerPersistentAggregator("dangling-current",
>DoubleSumAggregator.class);
>  registerPersistentAggregator("error-current",
>DoubleSumAggregator.class);
>  registerPersistentAggregator("pagerank-sum", DoubleSumAggregator.class);
>  registerPersistentAggregator("vertices-count", LongSumAggregator.class);
>}
>
>I am not 100% sure about registerAggregator vs.
>registerPersistentAggregator.
>
>The initialize() method is now being called, I see this on the console:
>09:34:46 DEBUG PageRankVertexMasterCompute :: initialize
>
>In PageRankVertexWorkerContext which extends WorkerContex I override
>the preApplication() method:
>
>@SuppressWarnings("unchecked")
>@Override
>public void preApplication() throws InstantiationException,
>IllegalAccessException {
>  log.debug("preApplication()");
>  
>System.out.println(((Aggregator<DoubleWritable>)getAggregatedValue("error-
>current")));
>  
>((Aggregator<DoubleWritable>)getAggregatedValue("error-current")).setAggre
>gatedValue(
>new DoubleWritable( Double.MAX_VALUE ) );
>}
>
>The getAggregatedValue("error-current") above is null and I do not
>understand why.
>
>Just to make things even more clear, this is how I used to run the
>PageRank implementation locally:
>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd5
>39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPageRa
>nkVertexLocally.java
>And this is the WorkerContext I used to have:
>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd5
>39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV
>ertexWorkerContext.java
>
>As you can see, it used to call registerAggregator(...) in the
>preApplication() method:
>
>@SuppressWarnings("unchecked")
>@Override
>public void preApplication() throws InstantiationException,
>IllegalAccessException {
>  log.debug("preApplication()");
>  registerAggregator("dangling-current", SumAggregator.class);
>  registerAggregator("error-current", SumAggregator.class);
>  registerAggregator("pagerank-sum", SumAggregator.class);
>  registerAggregator("vertices-count", LongSumAggregator.class);
>
>  
>((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggregated
>Value(
>new DoubleWritable( Double.MAX_VALUE ) );
>}
>
>The registerAggregator() method in WorkerContext is gone and I am
>trying to achieve the same via MasterCompute now.
>
>Regards,
>Paolo
>
>
>
>
>On 11 September 2012 00:20, Paolo Castagna <castagna.li...@gmail.com>
>wrote:
>> Hi Gianmarco,
>> good, that was one problem... but I am not yet back to the green bar.
>>
>> Here is how I am running it locally now:
>>
>>         Map<String,String> params = new HashMap<String,String>();
>>         params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>> "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>>         params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>> 
>>"org.apache.jena.grande.giraph.pagerank.SimplePageRankVertexMasterCompute
>>");
>>
>>         String[] data = getData ( filename );
>>         Iterable<String> results = InternalVertexRunner.run(
>>                 PageRankVertex.class,
>>                 PageRankVertexInputFormat.class,
>>                 PageRankVertexOutputFormat.class,
>>                 params,
>>                 data
>>             );
>>
>> However, I need to learn more about the MasterComput (and its relation
>> with WorkerContext).
>>
>> Paolo
>>
>> On 10 September 2012 22:08, Gianmarco De Francisci Morales
>> <g...@apache.org> wrote:
>>> Hi Paolo,
>>>
>>> Are you setting the MasterCompute class?
>>> You can do it with this option of bin/giraph
>>> -mc,--masterCompute <arg>      MasterCompute class
>>>
>>> Cheers,
>>> --
>>> Gianmarco
>>>
>>>
>>>
>>> On Mon, Sep 10, 2012 at 9:36 PM, Paolo Castagna
>>><castagna.li...@gmail.com>
>>> wrote:
>>>>
>>>> Hi,
>>>> first and foremost, thanks for all the work and improvements on
>>>>Giraph.
>>>> I went away from computers for a while (personal reasons) and changed
>>>> job, now I am back and playing with Giraph when I can.
>>>>
>>>> I updated my little examples (overall, it was easy and quick, here the
>>>> changes [1]. Just in case others are in a similar situation and want
>>>> to have a look).
>>>>
>>>> I am not sure I get the 'new' aggregators and in particular how I can
>>>> 'register' them. My tests failing confirm my non understanding! And
>>>> forgive me if I come here and ask such a simple question.
>>>>
>>>> Here is what I used to do [2]:
>>>>
>>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>>
>>>>   private static final Logger log =
>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>
>>>>   public static double errorPrevious = Double.MAX_VALUE;
>>>>   public static double danglingPrevious = 0d;
>>>>
>>>>   @SuppressWarnings("unchecked")
>>>>   @Override
>>>>   public void preApplication() throws InstantiationException,
>>>> IllegalAccessException {
>>>>     log.debug("preApplication()");
>>>>     registerAggregator("dangling-current", SumAggregator.class);
>>>>     registerAggregator("error-current", SumAggregator.class);
>>>>     registerAggregator("pagerank-sum", SumAggregator.class);
>>>>     registerAggregator("vertices-count", LongSumAggregator.class);
>>>>
>>>>
>>>> 
>>>>((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggrega
>>>>tedValue(
>>>> new DoubleWritable( Double.MAX_VALUE ) );
>>>>   }
>>>>
>>>>   [...]
>>>>
>>>>
>>>> Here is what I am trying to do now [3]:
>>>>
>>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>>
>>>>   private static final Logger log =
>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>
>>>>   public static double errorPrevious = Double.MAX_VALUE;
>>>>   public static double danglingPrevious = 0d;
>>>>
>>>>   // TODO: double check this... how is calling initialize()?
>>>>   public static class SimplePageRankVertexMasterCompute extends
>>>> DefaultMasterCompute {
>>>>     @Override
>>>>     public void initialize() throws InstantiationException,
>>>> IllegalAccessException {
>>>>       registerAggregator("dangling-current",
>>>>DoubleSumAggregator.class);
>>>>       registerAggregator("error-current", DoubleSumAggregator.class);
>>>>       registerAggregator("pagerank-sum", DoubleSumAggregator.class);
>>>>       registerAggregator("vertices-count", LongSumAggregator.class);
>>>>     }
>>>>   }
>>>>
>>>>   [...]
>>>>
>>>>
>>>> I am not convinced someone is actually calling the initialize() method
>>>> and there must be something I am missing (yesterday was late, after a
>>>> long day at work).
>>>>
>>>> Anyway, is there a place/example where I can learn how to use
>>>> Aggregators with the new Giraph?
>>>>
>>>> Thanks again and it's good to see Giraph mailing list and JIRA
>>>>'brewing'
>>>> ;-)
>>>>
>>>> Paolo
>>>>
>>>>
>>>>  [1]
>>>> 
>>>>https://github.com/castagna/jena-grande/commit/3edc0a7780f5e7c25d37956c
>>>>158d878b590858b5#src/main/java/org/apache/jena/grande/giraph/pagerank/P
>>>>ageRankVertexWorkerContext.java
>>>>  [2]
>>>> 
>>>>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78e
>>>>dd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/Pag
>>>>eRankVertexWorkerContext.java
>>>>  [3]
>>>> 
>>>>https://github.com/castagna/jena-grande/blob/3edc0a7780f5e7c25d37956c15
>>>>8d878b590858b5/src/main/java/org/apache/jena/grande/giraph/pagerank/Pag
>>>>eRankVertexWorkerContext.java
>>>
>>>

Reply via email to