Re: CPU utilization in each superstep

2015-04-05 Thread Matthew Saltz
I don't remember all the info it gives you but check out the giraph metrics
command line flag.


On Sunday, April 5, 2015, Ravikant Dindokar wrote:


 I  am newbie learning Giraph on hadoop 2.2.0. I want to find out CPU
 utilization  as well as time spent in message sending across each superstep.

 I am not familiar with Giraph code. Can someone point out functions I
 should look into to do logging to get this information?


Re: Undirected Vertex Definition and Reflexivity

2015-03-11 Thread Matthew Saltz

I believe the answer to your question is yes, though I've never done it. If
you use only the edge reader, only the vertices in your graph that have at
least one edge attached to them will be present in your graph. So, if you
have vertices that are entirely disconnected that you want included, you'd
need to do both a VertexReader and an Edge Reader (though I've never done
this). If you don't have disconnected vertices, you don't need the
VertexReader because Giraph will automatically add all of the vertices in
your edge file to the graph (I think this can be disabled). Use the -eip
flag to specify the edge file.

Matthew Saltz

On Wed, Mar 11, 2015 at 1:54 AM, G.W. wrote:

 Thanks for that!

 This is the right idea, however I was only using a VertexReader until now
 – IntNullReverseTextEdgeInputFormat calls for an EdgeReader.

 I am not sure this is the way it works but I like the idea of segregating
 edge and vertex definitions.

 *That leads to the following questions: can Giraph support the use of a
 VertexReader and EdgeReader at the same time, that is through the -vif and
 -eif arguments? *

 If that works, the idea would be to refactor my input files with:

 vertex_id, vertex_type, ...

 source_id, target_id

 with the edge reader working in reverse mode.


 On 10 March 2015 at 20:02, Matthew Saltz wrote:

 Have a look at IntNullReverseTextEdgeInputFormat
 It automatically creates reverse edges, but it expects the file format

 source_id, target_id

 on each line. If you need to convert it to use longs you can change the
 code pretty easily.


 On Tue, Mar 10, 2015 at 5:37 AM, Young Han

 The input is assumed to be the vertex followed by a set of *directed*
 edges. So, in your example, leaving out E2 means that the final graph will
 not have the directed edge from V2 to V1. To get an undirected edge, you
 need a pair of directed edges.

 Internally, Giraph stores the out-edges of each vertex as an adjacency
 list at that vertex. So, for example, your undirected graph becomes a
 vertex object V1 with an adjacency list {V2} and a vertex object V2 with an
 adjacency list {V1}. The directed graph would be a vertex V1 with adjacency
 list {V2} and a vertex V2 with an empty adjacency list {}.

 There's no easy way for Giraph to infer that V2's adjacency list should
 contain V1, because V2 does not track its in-edges. To get around this, you
 can either (1) use an undirected input file with both pairs of edges
 present; (2) have, in your algorithms, all vertices broadcast their ids to
 their out-edge neighbours and then perform mutations to add the missing
 edges in the first two superstep; or (3) modify the code in* (in giraph-core) to cache and add missing edges
 (i.e., add a new type of input format). I'm fairly certain that there
 doesn't already exist an assume undirected graph input reader, but I'm
 not too familiar with the code paths and options there so I could be wrong.


 On Mon, Mar 9, 2015 at 11:54 PM, G.W. wrote:

 Hi Giraph Mailing List,

 I am writing about an undirected graph I am trying to move to Giraph. I
 have a question about the assumption Giraph makes when processing an input.

 Let V1 and V2, two vertices connected with a common edge. E1 defines an
 edge from V1 to V2. E2 defines an edge from V2 to V1.

 Simply put, these are defined in an input file as:
 V1, E1
 V2, E2

 This is working fine, I can process the graph accordingly.

 I was trying to see what would happen if I was to simplify the input
 V1, E1

 When would come the time that V2 is processed in a superstep, Giraph
 would not suggest E1 as an  outgoing edge. My questions is why, knowing
 that E1 defines an edge from V1 to V2. The graph being undirected (although
 there is no provision for that in my Giraph computation), shouldn't Giraph
 assume that V2 is connected to V1?

 Down the road, the idea would be to streamline the input format, hence
 my question.


Re: Undirected Vertex Definition and Reflexivity

2015-03-10 Thread Matthew Saltz
Have a look at IntNullReverseTextEdgeInputFormat
It automatically creates reverse edges, but it expects the file format

source_id, target_id

on each line. If you need to convert it to use longs you can change the
code pretty easily.


On Tue, Mar 10, 2015 at 5:37 AM, Young Han wrote:

 The input is assumed to be the vertex followed by a set of *directed*
 edges. So, in your example, leaving out E2 means that the final graph will
 not have the directed edge from V2 to V1. To get an undirected edge, you
 need a pair of directed edges.

 Internally, Giraph stores the out-edges of each vertex as an adjacency
 list at that vertex. So, for example, your undirected graph becomes a
 vertex object V1 with an adjacency list {V2} and a vertex object V2 with an
 adjacency list {V1}. The directed graph would be a vertex V1 with adjacency
 list {V2} and a vertex V2 with an empty adjacency list {}.

 There's no easy way for Giraph to infer that V2's adjacency list should
 contain V1, because V2 does not track its in-edges. To get around this, you
 can either (1) use an undirected input file with both pairs of edges
 present; (2) have, in your algorithms, all vertices broadcast their ids to
 their out-edge neighbours and then perform mutations to add the missing
 edges in the first two superstep; or (3) modify the code in* (in giraph-core) to cache and add missing edges
 (i.e., add a new type of input format). I'm fairly certain that there
 doesn't already exist an assume undirected graph input reader, but I'm
 not too familiar with the code paths and options there so I could be wrong.


 On Mon, Mar 9, 2015 at 11:54 PM, G.W. wrote:

 Hi Giraph Mailing List,

 I am writing about an undirected graph I am trying to move to Giraph. I
 have a question about the assumption Giraph makes when processing an input.

 Let V1 and V2, two vertices connected with a common edge. E1 defines an
 edge from V1 to V2. E2 defines an edge from V2 to V1.

 Simply put, these are defined in an input file as:
 V1, E1
 V2, E2

 This is working fine, I can process the graph accordingly.

 I was trying to see what would happen if I was to simplify the input
 V1, E1

 When would come the time that V2 is processed in a superstep, Giraph
 would not suggest E1 as an  outgoing edge. My questions is why, knowing
 that E1 defines an edge from V1 to V2. The graph being undirected (although
 there is no provision for that in my Giraph computation), shouldn't Giraph
 assume that V2 is connected to V1?

 Down the road, the idea would be to streamline the input format, hence my


Re: Giraph Partitioning

2015-02-25 Thread Matthew Saltz

1) The partitions are processed in parallel based on the number of threads
you specify. The vertices within a partition are processed sequentially.
You may want to use more partitions than threads, that way if one partition
takes a particularly long time to be processed, the other threads can
continue processing the remaining partitions. If you have four machines
with 12 threads each for example, with one worker per machine, the default
number of partitions will be 4^2 = 16 partitions, whereas you actually have
48 threads available, so you'd probably want to specify the number of
partitions manually to a larger number to take advantage of parallelism.
2) Yes
3) If you are only doing single threading, there's no reason to do multiple
partitions per worker
3 (the second one)) I'm not familiar with the out-of-core functionality
4) I'm not sure

I'm basing this on the version of Giraph from this summer, not the most
recent release, but I don't think this part has changed. May want to verify
by looking at the code.


On Wed, Feb 25, 2015 at 3:25 AM, Arjun Sharma wrote:


 I understand that by default, the number of partitions = number of workers
 ^ 2. So, if we have N workers, each worker will process N partitions. I
 have a number of questions:

 1- By default, does Giraph process the N partitions within a single worker
 sequentially? If yes, when setting the parameter giraph.numComputeThreads,
 will partitions within each thread be computed sequentially?

 2- By default, does Giraph keep all partitions in memory?

 3- If the answers to 1 and 2 are yes and yes, is there any advantage from
 using multiple partitions versus a single partition in the case of single
 threading per worker?

 3- How does the out-of-core partitions affect out-of-core messages? Are
 they completely independent? For example, if the number of partitions to be
 kept in memory is set to a number less than N, and at the same time all
 messages are set to be kept in memory, will ALL messages be kept in memory,
 even those from out-of-core partitions? If the situation is reversed, where
 all partitions are kept in memory, and out-of-core messaging is set, will
 messages from memory-based partitions be saved on disk?

 4- Is there a class like a PartitionContext, where you can access
 preSuperstep and postSuperstep *per partition*, along the lines of

Re: Best way to know the assignment of vertices to workers

2014-11-28 Thread Matthew Saltz

To answer your question directly, in an AbstractComputation class (or
whatever descendant you're using), you may call
getWorkerContext().getMyWorkerIndex() (here
However, if each vertex has metadata associated to it, I think the best way
to go would be to define a custom VertexReader
and custom Vertex type to take that into account when reading the vertex.


On Fri, Nov 28, 2014 at 1:02 PM, Garimella Kiran

  Hi all,

  Is there a clean way to find out which worker a particular vertex is
 assigned to?

  From what I tried out, I found that given n workers, each node is
 assigned to the worker with id (vertex_id % n  ). Is that a safe way to do

  I’ve had a look at previous discussions, but most of them have no answer.


  Why I need it:

  In my application, each vertex needs to know some additional meta data,
 which is loaded from file. This metadata file is huge (50 G) and so, on
 each worker, I only want to load the metadata corresponding to the vertices
 present on that worker.


  Previous discussions:


Re: When do Giraph vertices receive their messages?

2014-11-10 Thread Matthew Saltz
Hi Vincentius,

I'd recommend checking out the code in the call() method of this class
try to follow the logic that occurs during computation in a superstep, as
well as the code
for handling message sending and the execute method in GraphTaskManager
basically handles the overall control flow of everything. I've found that
for Giraph at some point you're going to more or less need to dig through
the code to figure out what's going on behind the scenes.  Looking at the
call() method and computePartition() methods in ComputeCallable are pretty
enlightening.  As far as messaging goes it appears that everything is
flushed from the sender before the end of the superstep.

Someone else please correct me if I'm wrong about any of these things; I
don't want to mislead anyone.

Matthew Saltz

On Mon, Nov 10, 2014 at 12:18 PM, Vincentius Martin wrote:

 Hi XingFeng, thanks for your answer!

 Yes, I have already read Pregel paper, unfortunately there are some
 specific steps that I still couldn't grasp.

 Therefore, when does the checkpoint happen? Is it before/after the step 1
 (the receiving messages phase) in your explanation?

 Also, according to your explanation, I can deduce that at the beginning of
 each superstep, the messages are still in the sender workers' buffer and
 each of the sender workers will send them at this phase. Am I right?

 Vincentius Martin

 On Mon, Nov 10, 2014 at 5:49 PM, XingFENG

 Hi Vincentius Martin,

 Since Giraph is based on Pregel, I would refer you to the paper *Pregel:
 A System for Large-Scale Graph Processing *for more details.

 Briefly speaking, in each superstep,
 1. a worker (which is responsible for a partition of vertices) receives
 messages from others. A worker then divided these messages according to the
 destID and active vertices which have incoming messages.
 2. a worker runs *compute* function of each active vertex. Meanwhile,
 the *compute* function may generate messages to other vertices. These
 messages are buffered, combined and sent in batches in an asynchronous way.
 3. after a worker finishes *compute* function of all active vertex, it
 waits for all other workers finishing their *compute* functions. What is
 more, it waits for all sending tasks to finish to ensure all messages can
 be received in next superstep. Then every worker goes into next superstep.

 For your second problem, messages are stored in a buffer.

 On Mon, Nov 10, 2014 at 6:14 PM, Puneet Agarwal

 These are some very interesting questions. I also would like to know the
 answers to these.

 - Puneet
 IIT Delhi, India

   On Monday, November 10, 2014 9:30 AM, Vincentius Martin wrote:

 I am curious about how does Giraph receive messages before processing it
 I know that they use their accepted messages in the compute() method on
 the next superstep, but when do they receive it? If it is before the
 checkpoint process, is there any part in the documentation/code that I can
 see to understand it?
 Also, what mechanism that Giraph use to store messages before superstep
 S+1? Are they store it in a buffer or disk first?
 I still cannot find anything about this.

 Vincentius Martin

 Best Regards.
 Xing FENG
 PhD Candidate
 Database Research Group

 School of Computer Science and Engineering
 University of New South Wales
 NSW 2052, Sydney

 Phone: (+61) 413 857 288

How to ensure that only one worker runs per node

2014-10-30 Thread Matthew Saltz
Hi everyone,

Is there a good way (a configuration I'm guessing) to prevent more than one
worker from running per node? I saw in this thread to use, but that doesn't seem to be
working. Thanks for the help.


Re: How to ensure that only one worker runs per node

2014-10-30 Thread Matthew Saltz
Thanks Matthew. I just realized in searching a bit more that the variable
has to be set in the mapred-site.xml configuration file with hadoop
0.20.203, otherwise it doesn't affect anything. I was setting it in the
command line.  I've already been setting the number of workers the way you
suggested. I'll give it a try.


On Thu, Oct 30, 2014 at 12:36 PM, Matthew Cornell

 As I understand it, 1) set the variable to 1 as you say, and 2)
 specify the number of workers to the number of nodes - 1 (for the
 master). When you run a job you can look at the 'map' link on the
 tasktracker ui to see all the workers plus master.

 On Thu, Oct 30, 2014 at 7:11 AM, Matthew Saltz wrote:
  Hi everyone,
  Is there a good way (a configuration I'm guessing) to prevent more than
  worker from running per node? I saw in this thread to use, but that doesn't seem to be
  Thanks for the help.

 Matthew Cornell | | 413-626-3621 | 34
 Dickinson Street, Amherst MA 01002 |

Re: Resource Allocation Model Of Apache Giraph

2014-10-24 Thread Matthew Saltz
You may set giraph.userPartitionCount=number of workers and
Like Avery said though, since parallelism occurs on a partition level (each
thread processes a different partition) if you only have one partition per
worker you cannot take advantage of multithreading.


On Fri, Oct 24, 2014 at 3:53 AM, Zhang, David (Paypal Risk) wrote:

  I think no good solution. You can try to run a java application by using
 FileInputFormat.getSplits to get the size of the array, which number you
 can set to giraph workers.

 Or run a simple map-reduce job by using IdentityMapper to see how many
 mappers there.


 Zhang, David (Paypal Risk)

 *From:* Charith Wickramarachchi []
 *Sent:* 2014年10月24日 5:37
 *To:* user
 *Subject:* Re: Resource Allocation Model Of Apache Giraph

 Thanks  Claudio and Avery,

 I find a way way to configure hadoop to have desired number of mappers
 per machine as Claudio mentioned.


 Could you please tell me how I can configure giraph to make each worker
 handle only a single partition?


 On Thu, Oct 23, 2014 at 2:26 PM, Avery Ching wrote:

 Regarding your second point, partitions are decoupled from workers.  A
 worker can handle zero or more partitions.  You can make each worker handle
 one partition, but we typically like multiple partitions since we can use
 multi-threading per machine.

 On 10/23/14, 9:04 AM, Claudio Martella wrote:

  the way mappers (or containers) and hence workers are assigned to
 machines is not under the control of giraph, but of the underlying hadoop
 environment (with different responsibilities that depend on the hadoop
 version, e.g. YARN). You'll have to tweak your hadoop configuration to
 control the maximum number of workers assigned to one machine (optimally
 one with multiple threads).

 On Thu, Oct 23, 2014 at 5:53 PM, Charith Wickramarachchi wrote:

 Hi Folks,

 I'm wondering what is the resource allocation model for Apache Giraph.

 As I understand each worker is one to one Mapped with a Mapper and a
 worker can process multiple partitions with a user defined number of

 Is it possible to make sure that one worker, only process a single
 partition? Also is it possible to control the worker assignment in the
 cluster nodes? (Ex: Make sure only N  workers runs on a single machine,
 assuming we have enough resources)




 Charith Dhanushka Wickramaarachchi

 Tel  +1 213 447 4253



 Twitter  @charithwiki

 This communication may contain privileged or other
 confidential information and is intended exclusively for the addressee/s.
 If you are not the intended recipient/s, or believe that you may have
 received this communication in error, please reply to the
 sender indicating that fact and delete the copy you received and in
 addition, you should not print, copy, retransmit, disseminate, or otherwise
 use the information contained in this communication.
 Internet communications cannot be guaranteed to be timely, secure, error
 or virus-free. The sender does not accept liability for any errors
 or omissions


Claudio Martella


 Charith Dhanushka Wickramaarachchi

 Tel  +1 213 447 4253



 Twitter  @charithwiki

 This communication may contain privileged or other
 confidential information and is intended exclusively for the addressee/s.
 If you are not the intended recipient/s, or believe that you may have
 received this communication in error, please reply to the
 sender indicating that fact and delete the copy you received and in
 addition, you should not print, copy, retransmit, disseminate, or otherwise
 use the information contained in this communication.
 Internet communications cannot be guaranteed to be timely, secure, error
 or virus-free. The sender does not accept liability for any errors
 or omissions

Excessive Memory Usage Compared to Graph Size

2014-10-23 Thread Matthew Saltz
Hi everyone,

I'm working on a community detection algorithm for giraph and I'm trying to
execute the algorithm on the Friendster graph, which has about 65M nodes
and about 1.8 billion edges. Running on 16 machines, before doing ANY
processing, it's taking about 50G of RAM. That's 800G total for this graph,
which seems excessive. I'm using the 1.1.0 stable release and Hadoop
0.20.203 (should I use a newer version of hadoop?).  This is the command
I'm running:

$HADOOP_HOME/bin/hadoop --config $CONF jar $GIR_JAR
org.apache.giraph.GiraphRunner -D ' -Xmx80G'
-D '' -D-Xmx6m -libjars $LIBJARS
computation.StartComputation $GIRAPH_OPTIONS -eif -eip $INPUT
-vof -op $OUTPUT
-w $N_WORKERS -mc computation.WccMasterCompute

where N_THREADS=12 (number of available threads on the machine),

GIRAPH_OPTIONS=-ca giraph.useSuperstepCounters=false
-ca giraph.numComputeThreads=$N_THREADS
-ca giraph.numInputThreads=$N_THREADS
-ca giraph.numOutputThreads=$N_THREADS
-ca giraph.oneToAllMsgSending=true
-ca giraph.metrics.enable=true
-ca giraph.maxPartitionsInMemory=$N_THREADS
-ca giraph.userPartitionCount=$N_PARTITIONS
-ca giraph.outEdgesClass=utils.IntNullHashSetEdges

IntNullHashSetEdges takes a bit more memory than IntNullArrayEdges I know
but it doesn't make that big of a difference .

Each vertex contains 7 ints, a double, three arrays and a map, but all of
these are empty when the graph is loaded and it still takes that much

I feel like I must be doing something wrong, or missing a configuration
option, or something.Thanks in advance for any help you might be able
to offer.


Multiple sendMessage calls vs. sendMessageToMultipleEdges

2014-10-22 Thread Matthew Saltz
Hi everyone,

I have two questions:

*Question 1)* I'm using release 1.1.0 and I'm really confused about the
fact that I'm having massive performance differences in the following
scenario. I need to send one message from each vertex to a subset of its
neighbors (all that satisfy a certain condition). For that, I see two basic

   a) Loop over all edges, making a call to sendMessage(source, target)
whenever target satisfies a condition I want, reusing the same IntWritable
for the target vertex by calling target.set(_)
   b) Loop over all edges, building up an ArrayList (or whatever) of
targets that satisfy the condition, and calling
sendMessageToMultipleMessages(targets) at the end.

Surprisingly, I get much, much worse performance using option (a), which I
would think would be much faster. So I looked in the code and eventually
found my way to SendMessageCache,
where it turns out that sendMessageToMultipleMessages -
sendMessageToAllRequest(Iterator, Message) actually just loops over the
iterator, repeatedly calling sendMessageRequest (which is what I thought I
was doing in scenario (a). I might have incorrectly traced the code though.
Can anyone tell me what might be going on? I'm really puzzled by this.

*Question 2) *Is there a good way of sending a vertex's adjacency list to
its neighbors, without building up your own copy of an adjacency list and
then sending that? I'm going through the Edge iterable and building an
ArrayPrimitiveWritable of ids but it would be nice if I could somehow
access the underlying data structure behind the iterable or just wrap the
iterable as a writable somehow.

Thanks so much for the help,
Matthew Saltz

Re: Multiple sendMessage calls vs. sendMessageToMultipleEdges

2014-10-22 Thread Matthew Saltz

Thank you so much for the help. By 'the first class', you mean
is not used unless I set the property to true, right? Because I actually do
have giraph.oneToAllMsgSending=true, so if that means it's using
SendMessageToAllCache  then everything makes much more sense. So I guess it
makes sense then that case (b) that I mentioned that would be much faster
than case (a)? I really appreciate it.  And do you have any ideas about the
second question I asked? I think the answer is no but I'm kind of hoping
it's not.


On Wed, Oct 22, 2014 at 11:16 PM, Lukas Nalezenec wrote:

  Hi Matthew,

 See class SendMessageToAllCache. Its in the same directory as
 SendMessageCache. The first class is not used by Giraph unless you set
 property giraph.oneToAllMsgSending to true.


 On 22.10.2014 20:10, Matthew Saltz wrote:

 Hi everyone,

 I have two questions:

  *Question 1)* I'm using release 1.1.0 and I'm really confused about the
 fact that I'm having massive performance differences in the following
 scenario. I need to send one message from each vertex to a subset of its
 neighbors (all that satisfy a certain condition). For that, I see two basic

 a) Loop over all edges, making a call to sendMessage(source, target)
 whenever target satisfies a condition I want, reusing the same IntWritable
 for the target vertex by calling target.set(_)
b) Loop over all edges, building up an ArrayList (or whatever) of
 targets that satisfy the condition, and calling
 sendMessageToMultipleMessages(targets) at the end.

  Surprisingly, I get much, much worse performance using option (a), which
 I would think would be much faster. So I looked in the code and eventually
 found my way to SendMessageCache,
 where it turns out that sendMessageToMultipleMessages -
 sendMessageToAllRequest(Iterator, Message) actually just loops over the
 iterator, repeatedly calling sendMessageRequest (which is what I thought I
 was doing in scenario (a). I might have incorrectly traced the code though.
 Can anyone tell me what might be going on? I'm really puzzled by this.

  *Question 2) *Is there a good way of sending a vertex's adjacency list
 to its neighbors, without building up your own copy of an adjacency list
 and then sending that? I'm going through the Edge iterable and building an
 ArrayPrimitiveWritable of ids but it would be nice if I could somehow
 access the underlying data structure behind the iterable or just wrap the
 iterable as a writable somehow.

  Thanks so much for the help,
 Matthew Saltz

Re: Multiple sendMessage calls vs. sendMessageToMultipleEdges

2014-10-22 Thread Matthew Saltz
Actually,  one more question: are there any disadvantages to enabling
oneToAllMessaging? Is there any reason not to do it by default?

El 22/10/2014 23:28, Matthew Saltz escribió:


 Thank you so much for the help. By 'the first class', you mean 
 is not used unless I set the property to true, right? Because I actually do
 have giraph.oneToAllMsgSending=true, so if that means it's using
 SendMessageToAllCache  then everything makes much more sense. So I guess
 it makes sense then that case (b) that I mentioned that would be much
 faster than case (a)? I really appreciate it.  And do you have any ideas
 about the second question I asked? I think the answer is no but I'm kind of
 hoping it's not.


 On Wed, Oct 22, 2014 at 11:16 PM, Lukas Nalezenec wrote:

  Hi Matthew,

 See class SendMessageToAllCache. Its in the same directory as
 SendMessageCache. The first class is not used by Giraph unless you set
 property giraph.oneToAllMsgSending to true.


 On 22.10.2014 20:10, Matthew Saltz wrote:

 Hi everyone,

 I have two questions:

  *Question 1)* I'm using release 1.1.0 and I'm really confused about the
 fact that I'm having massive performance differences in the following
 scenario. I need to send one message from each vertex to a subset of its
 neighbors (all that satisfy a certain condition). For that, I see two basic

 a) Loop over all edges, making a call to sendMessage(source, target)
 whenever target satisfies a condition I want, reusing the same IntWritable
 for the target vertex by calling target.set(_)
b) Loop over all edges, building up an ArrayList (or whatever) of
 targets that satisfy the condition, and calling
 sendMessageToMultipleMessages(targets) at the end.

  Surprisingly, I get much, much worse performance using option (a),
 which I would think would be much faster. So I looked in the code and
 eventually found my way to SendMessageCache,
 where it turns out that sendMessageToMultipleMessages -
 sendMessageToAllRequest(Iterator, Message) actually just loops over the
 iterator, repeatedly calling sendMessageRequest (which is what I thought I
 was doing in scenario (a). I might have incorrectly traced the code though.
 Can anyone tell me what might be going on? I'm really puzzled by this.

  *Question 2) *Is there a good way of sending a vertex's adjacency list
 to its neighbors, without building up your own copy of an adjacency list
 and then sending that? I'm going through the Edge iterable and building an
 ArrayPrimitiveWritable of ids but it would be nice if I could somehow
 access the underlying data structure behind the iterable or just wrap the
 iterable as a writable somehow.

  Thanks so much for the help,
 Matthew Saltz

Re: getAggregatedValue calling aggregate

2014-09-23 Thread Matthew Saltz
HI Puneet,

What are you trying to do in getAggregatedValue()? Is there any reason you
don't just want to return the current value of the aggregator (which is
what the default implementation does)?


On Sat, Sep 20, 2014 at 6:25 AM, Puneet Agarwal wrote:

 I have created my own aggregator, by extending the BasicAggregator.

 In this aggregator I have overridden following methods


 (The documentation says that we need to implement aggregate and 
 methods only, if so, how and what will it return when getAggregatedValue()
 is called?)

 My main problems are:

 a. When I call the method getAggregatedValue() from the compute method of
 my vertex class, nothing gets returned
 b. When I call the method getAggregatedValue() from the compute method of
 my vertex class, aggregate() method of the aggregator class is getting
 called and it received the value returned by the method

 am I missing something very basic here?

 Please Help

 Puneet Agarwal
 IIT Delhi

Re: understanding failing my job, Giraph/Hadoop memory usage, under-utilized nodes, and moving forward

2014-09-22 Thread Matthew Saltz
Sorry, should be
*I left out the giraph.

On Mon, Sep 22, 2014 at 8:10 PM, Matthew Saltz wrote:

 Hi Matthew,

 I answered a few of your questions in-line (unfortunately they might not
 help the larger problem, but hopefully it'll help a bit).


 On Mon, Sep 22, 2014 at 5:50 PM, Matthew Cornell

 Hi Folks,

 I've spent the last two months learning, installing, coding, and
 analyzing the performance of our Giraph app, and I'm able to run on
 small inputs on our tiny cluster (yay!) I am now stuck trying to
 figure out why larger inputs fail, why only some compute nodes are
 being used, and generally how to make sure I've configured hadoop and
 giraph to use all available CPUs and RAM. I feel that I'm this
 close, and I could really use some pointers.

 Below I share our app, configuration, results and log messages, some
 questions, and counter output for the successful run. My post here is
 long (I've broken it into sections delimited with ''), but I hope
 I've provided good enough information to get help on. I'm happy to add
 to it.



 Our application is a kind of path search where all nodes have a type
 and source database ID (e.g., movie 99), and searches are expressed
 as type paths, such as movie, acted_in, actor, which would start
 with movies and then find all actors in each movie, for all movies in
 the database. The program does a kind of filtering by keeping track of
 previously-processed initial IDs.

 Our database is a small movie one with 2K movies, 6K users (people who
 rate movies), and 80K ratings of movies by users. Though small, we've
 found this kind of search can result in a massive explosion of
 messages, as was well put by Rob Vesse (

  even with this relatively small graph you get a massive explosion of
 messages by the later super steps which exhausts memory (in my graph the
 theoretical maximum messages by the last super step was ~3 billion)

  job failure and error messages 

 Currently I have a four-step path that completes in ~20 seconds
 (rates, movie, rates, user - counter output shown at bottom) but a
 five-step one (rates, movie, rates, user, rates) fails after a few
 minutes. I've looked carefully at the task logs, but I find it a
 little difficult to discern what the actual failure was. However,
 looking at system information (e.g., top and ganglia) during the run
 indicates hosts are running out of memory. There are no
 OutOfMemoryErrors in the logs, and only this one stsands out:

  ERROR org.apache.giraph.master.BspServiceMaster:
 superstepChosenWorkerAlive: Missing chosen worker
 Worker(hostname=compute-0-3.wright, MRtaskID=1, port=30001) on superstep 4

 NB: So far I've been ignoring these other types of messages:

  FATAL org.apache.giraph.master.BspServiceMaster: getLastGoodCheckpoint:
 No last good checkpoints can be found, killing the job. File
 _bsp/_checkpoints/job_201409191450_0003 does not exist.

  WARN org.apache.giraph.bsp.BspService: process: Unknown and unprocessed
 type=NodeDeleted, state=SyncConnected)

  ERROR org.apache.giraph.worker.BspServiceWorker: unregisterHealth: Got
 failure, unregistering health on
 on superstep 4

 The counter statistics are minimal after the run fails, but during it
 I see something like this when refreshing the Job Tracker Web UI:

  Counters  Map-Reduce Framework  Physical memory (bytes) snapshot:
  Counters  Map-Reduce Framework  Virtual memory (bytes) snapshot: ~27GB
  Counters  Giraph Stats  Sent messages: ~181M

  hadoop/giraph command 

 hadoop jar $GIRAPH_HOME/giraph-ex.jar org.apache.giraph.GiraphRunner \ \
 -libjars ${LIBJARS} \
 relpath.RelPathVertex \
 -wc relpath.RelPathWorkerContext \
 -mc relpath.RelPathMasterCompute \
 -vif relpath.CausalityJsonAdjacencyListVertexInputFormat \
 -of relpath.RelPathVertexValueTextOutputFormat \
 -ca RelPathVertex.path=$REL_PATH_PATH \
 -w 8

  cluster, versions, and configuration 

 We have a five-node cluster with a head and four compute nodes. The
 head has 2 CPUs, 16 cores each, and 64 GB RAM. Each compute has 1 CPU,
 4 cores each, and 16 GB RAM, making a total cluster of 128 GB of RAM
 and 48 cores.

 Hadoop: Cloudera CDH4 with a mapreduce service running the job tracker
 on the head node, and task trackers on all five nodes.

 Hadoop configuration (mapred-site.xml and CDH interface - sorry for
 the mix) - not sure I'm listing

Re: understanding failing my job, Giraph/Hadoop memory usage, under-utilized nodes, and moving forward

2014-09-22 Thread Matthew Saltz
Hi Matthew,

I answered a few of your questions in-line (unfortunately they might not
help the larger problem, but hopefully it'll help a bit).


On Mon, Sep 22, 2014 at 5:50 PM, Matthew Cornell

 Hi Folks,

 I've spent the last two months learning, installing, coding, and
 analyzing the performance of our Giraph app, and I'm able to run on
 small inputs on our tiny cluster (yay!) I am now stuck trying to
 figure out why larger inputs fail, why only some compute nodes are
 being used, and generally how to make sure I've configured hadoop and
 giraph to use all available CPUs and RAM. I feel that I'm this
 close, and I could really use some pointers.

 Below I share our app, configuration, results and log messages, some
 questions, and counter output for the successful run. My post here is
 long (I've broken it into sections delimited with ''), but I hope
 I've provided good enough information to get help on. I'm happy to add
 to it.



 Our application is a kind of path search where all nodes have a type
 and source database ID (e.g., movie 99), and searches are expressed
 as type paths, such as movie, acted_in, actor, which would start
 with movies and then find all actors in each movie, for all movies in
 the database. The program does a kind of filtering by keeping track of
 previously-processed initial IDs.

 Our database is a small movie one with 2K movies, 6K users (people who
 rate movies), and 80K ratings of movies by users. Though small, we've
 found this kind of search can result in a massive explosion of
 messages, as was well put by Rob Vesse (

  even with this relatively small graph you get a massive explosion of
 messages by the later super steps which exhausts memory (in my graph the
 theoretical maximum messages by the last super step was ~3 billion)

  job failure and error messages 

 Currently I have a four-step path that completes in ~20 seconds
 (rates, movie, rates, user - counter output shown at bottom) but a
 five-step one (rates, movie, rates, user, rates) fails after a few
 minutes. I've looked carefully at the task logs, but I find it a
 little difficult to discern what the actual failure was. However,
 looking at system information (e.g., top and ganglia) during the run
 indicates hosts are running out of memory. There are no
 OutOfMemoryErrors in the logs, and only this one stsands out:

  ERROR org.apache.giraph.master.BspServiceMaster:
 superstepChosenWorkerAlive: Missing chosen worker
 Worker(hostname=compute-0-3.wright, MRtaskID=1, port=30001) on superstep 4

 NB: So far I've been ignoring these other types of messages:

  FATAL org.apache.giraph.master.BspServiceMaster: getLastGoodCheckpoint:
 No last good checkpoints can be found, killing the job. File
 _bsp/_checkpoints/job_201409191450_0003 does not exist.

  WARN org.apache.giraph.bsp.BspService: process: Unknown and unprocessed
 type=NodeDeleted, state=SyncConnected)

  ERROR org.apache.giraph.worker.BspServiceWorker: unregisterHealth: Got
 failure, unregistering health on
 on superstep 4

 The counter statistics are minimal after the run fails, but during it
 I see something like this when refreshing the Job Tracker Web UI:

  Counters  Map-Reduce Framework  Physical memory (bytes) snapshot: ~28GB
  Counters  Map-Reduce Framework  Virtual memory (bytes) snapshot: ~27GB
  Counters  Giraph Stats  Sent messages: ~181M

  hadoop/giraph command 

 hadoop jar $GIRAPH_HOME/giraph-ex.jar org.apache.giraph.GiraphRunner \ \
 -libjars ${LIBJARS} \
 relpath.RelPathVertex \
 -wc relpath.RelPathWorkerContext \
 -mc relpath.RelPathMasterCompute \
 -vif relpath.CausalityJsonAdjacencyListVertexInputFormat \
 -of relpath.RelPathVertexValueTextOutputFormat \
 -ca RelPathVertex.path=$REL_PATH_PATH \
 -w 8

  cluster, versions, and configuration 

 We have a five-node cluster with a head and four compute nodes. The
 head has 2 CPUs, 16 cores each, and 64 GB RAM. Each compute has 1 CPU,
 4 cores each, and 16 GB RAM, making a total cluster of 128 GB of RAM
 and 48 cores.

 Hadoop: Cloudera CDH4 with a mapreduce service running the job tracker
 on the head node, and task trackers on all five nodes.

 Hadoop configuration (mapred-site.xml and CDH interface - sorry for
 the mix) - not sure I'm listing all of them of interest:

  mapreduce.job.counters.max: 120
  mapred.output.compress: false
  mapred.reduce.tasks: 12 -Xmx2147483648

Re: Problem processing large graph

2014-09-11 Thread Matthew Saltz
Hi Tripti,

How many machines are you running on? The ideal configuration would be one
worker per machine and one separate machine for the master. If you're using
more mappers than machines then you're using more resources than necessary,
and fixing that could help.

El 11/09/2014 13:39, Tripti Singh escribió:

  Hi Avery,
 Thanks for your reply.
 I did adjust the heap and container size to a higher value(3072MB and
 4096MB respectively) and I am not running the out of core option as well.
 I am intermittently able to run the job with 200 mappers. At other times,
 I can run part of the data while the other part gets stalled.
 FYI, I am using netty without authentication.
 One thing that I have noticed, however, is that it mostly runs
 successfully when the queue is running almost idle or is comparatively
 free. On most instances when the queue is running more tasks or is
 over-allocated, my job stalls even when the required number of containers
 are allocated. Looking at the logs, I mostly find it stalling at Superstep
 0 or 1 after finishing Superstep –1. Or sometimes, even at –1.
 Could there be some shared resources in the queue which are not enough for
 the job while the job runs on a loaded queue and can I configure some other
 value to make it run?

  Tripti Singh
 Tech Yahoo, Software Sys Dev Eng
 P: +91 080.30516197  M: +91 9611907150
 Yahoo Software Development India Pvt. Ltd Torrey Pines Bangalore 560 071

   From: Avery Ching
 Date: Wednesday, September 3, 2014 at 10:53 PM
 Subject: Re: Problem processing large graph

   Hi Tripti,

 Is there a chance you can use higher memory machines so you don't run out
 of core?  We do it this way at Facebook.  We've haven't tested the
 out-of-core option.


 On 8/31/14, 2:34 PM, Tripti Singh wrote:

  I am able to successfully build hadoop_yarn profile for running Giraph
  I am also able to test run Connected Components on a small dataset.
  However, I am seeing 2 issues while running on a bigger dataset with 400

1. I am unable to use out of Core Graph option. It errors out saying
that it cannot read INIT partition. (Sorry I don’t have the log currently
but I will share after I run that again).
I am expecting that if the out of Core option is fixed, I should be
able to run the workflow with less mappers.
2. In order to run the workflow anyhow, I removed the out of Core
option and adjusted the heap size. This also runs with smaller dataset but
fails with huge dataset.
Worker logs are mostly empty. Non-empty logs end like this:

 *mapred.task.partition is deprecated. Instead, use
mapreduce.task.partition [STATUS: task-374] setup: Beginning worker setup.
setup: Log level remains at info [STATUS: task-374] setup: Initializing
Zookeeper services. is deprecated.
Instead, use job.local.dir is
deprecated. Instead, use mapreduce.job.local.dir [STATUS: task-374] setup:
Setting up Zookeeper manager. createCandidateStamp: Made the directory
createCandidateStamp: Made the directory

createCandidateStamp: Creating my filestamp

 _bsp/_defaultZkManagerDir/giraph_yarn_application_1407992474095_708614/_task/ 374 getZooKeeperServerList: For task
374, got file 'null' (polling period is 3000) *

Master log has log statements for launching the container, opening
proxy and processing event like this:

 *Opening proxy :  Processing Event EventType:
QUERY_CONTAINER for Container container_1407992474095_708614_01_000314 ……*

 I am not using SASL authentication.
 Any idea what might be wrong?


Re: How do I validate customArguments?

2014-09-10 Thread Matthew Saltz
No worries. Just by the way, I realized after I sent that that using the
public static int numPreprocessingSteps to store the value in the
MasterCompute class doesn't work; you need to register a permanent
aggregator to hold on to it, if you need to.


On Wed, Sep 10, 2014 at 3:15 PM, Matthew Cornell

 Sorry for the long delay, Matthew. That's really helpful. Right now I'm
 stuck on apparently running out of memory on our little cluster, but the
 log messages are confusing. I'm putting together a question, but in the
 meantime I'll try one of the simpler examples such as degree count to see
 if /anything/ will run against my graph, which is very small (100K and
 edges nodes). -- matt

 On Thu, Aug 28, 2014 at 2:26 PM, Matthew Saltz wrote:


 I'm not sure if you've resolved this problem already or not, but if you
 haven't: The initialize() method isn't limited to registering aggregators,
 and in fact, in my project I use it to do exactly what you're describing to
 check and load custom configuration parameters. Inside the initialize()
 method, I do this:

 *String numPreprocessingStepsConf =
 *numPreprocessingSteps = (numPreprocessingStepsConf != null) ?*
 *Integer.parseInt(numPreprocessingStepsConf.trim()) :*
 *System.out.println(Number of preprocessing steps:  +

 where at the class level I declare:

   public static final String NUMBER_OF_PREPROCESSING_STEPS_CONF_OPT =
   public static final int DEFAULT_NUMBER_OF_PREPROCESSING_STEPS = 1;
   public static int numPreprocessingSteps;

 To set the property, I use the option -ca
 wcc.numPreprocessingSteps=number of steps I want. If you need to check
 that it's properly formatted and not store them, this is a fine place to do
 it as well, given that it's run before the input superstep (see the giraph
 code in BspServiceMaster, line 1617 in the stable 1.1.0 release). What
 happens is that on the master, the MasterThread calls coordinateSuperstep()
 on a BspServiceMaster object, which checks if it's the input superstep, and
 if so, calls initialize() on the MasterCompute object (created in the
 becomeMaster() method of BspServiceMaster).

 Hope this helps,

 On Tue, Aug 26, 2014 at 4:36 PM, Matthew Cornell

 Hi again. My application needs to pass in a String argument to the
 computation which each Vertex needs access to. (The argument is a list of
 the form [item1, item2, ...].) I found --customArguments (which I set in
 my tests via conf.set(arg_name, arg_val)) but I need to check that it's
 properly formatted. Where do I do that? The only thing I thought of is to
 specify a DefaultMasterCompute subclass whose initialize() does the check,
 but all the initialize() examples do is register aggregators; none of them
 check args or do anything else. Thanks in advance! -- matt

 Matthew Cornell | | 413-626-3621 | 34 Dickinson
 Street, Amherst MA 01002 |

 Matthew Cornell | | 413-626-3621 | 34 Dickinson
 Street, Amherst MA 01002 |

Re: How do I validate customArguments?

2014-08-28 Thread Matthew Saltz

I'm not sure if you've resolved this problem already or not, but if you
haven't: The initialize() method isn't limited to registering aggregators,
and in fact, in my project I use it to do exactly what you're describing to
check and load custom configuration parameters. Inside the initialize()
method, I do this:

*String numPreprocessingStepsConf =
*numPreprocessingSteps = (numPreprocessingStepsConf != null) ?*
*Integer.parseInt(numPreprocessingStepsConf.trim()) :*
*System.out.println(Number of preprocessing steps:  +

where at the class level I declare:

  public static final String NUMBER_OF_PREPROCESSING_STEPS_CONF_OPT =
  public static final int DEFAULT_NUMBER_OF_PREPROCESSING_STEPS = 1;
  public static int numPreprocessingSteps;

To set the property, I use the option -ca
wcc.numPreprocessingSteps=number of steps I want. If you need to check
that it's properly formatted and not store them, this is a fine place to do
it as well, given that it's run before the input superstep (see the giraph
code in BspServiceMaster, line 1617 in the stable 1.1.0 release). What
happens is that on the master, the MasterThread calls coordinateSuperstep()
on a BspServiceMaster object, which checks if it's the input superstep, and
if so, calls initialize() on the MasterCompute object (created in the
becomeMaster() method of BspServiceMaster).

Hope this helps,

On Tue, Aug 26, 2014 at 4:36 PM, Matthew Cornell

 Hi again. My application needs to pass in a String argument to the
 computation which each Vertex needs access to. (The argument is a list of
 the form [item1, item2, ...].) I found --customArguments (which I set in
 my tests via conf.set(arg_name, arg_val)) but I need to check that it's
 properly formatted. Where do I do that? The only thing I thought of is to
 specify a DefaultMasterCompute subclass whose initialize() does the check,
 but all the initialize() examples do is register aggregators; none of them
 check args or do anything else. Thanks in advance! -- matt

 Matthew Cornell | | 413-626-3621 | 34 Dickinson
 Street, Amherst MA 01002 |

Re: Setting variable value in Compute class and using it in the next superstep

2014-07-21 Thread Matthew Saltz

If it's necessary to store more than one flag though, for example, won't a
custom class be necessary? I'm a beginner too, so I apologize if I'm
incorrect about that. Just to be clarify, to keep persistent data for a
vertex from one superstep to the next, it is necessary to encapsulate it in
the type used for the 'V', right? In other words, if Vivek tries to use a
normal member variable for the Computation class, it won't work will it?

Also, just to point out, there actually isn't too much involved with
writing your own custom vertex class. Here's a quick example to get you started.
Within your compute() method you can access the data in this class by doing

SampleVertexData d = vertex.getValue();

and then using d.setFlag(true) or boolean currentFlag = d.getFlag() for
example.  And your computation class is now something like

public class MyComputation extends BasicComputation*IdType, *
*SampleVertexData*, *EdgeType, MessageType* {
public void compute(Vertex*IdType, **SampleVertexData*, *EdgeType*
vertex, Iterable*MessageType messages) {.} *



As a warning, for this class I'm using Hadoop 0.20.203 and I'm also a
beginner, so take everything I say with a grain of salt, and Tom please
correct me if I'm wrong.

Best of luck,

On Mon, Jul 21, 2014 at 11:37 PM, Schweiger, Tom

  And in answer of :

 This post also suggests (along with what I described above) to have a
 field in the vertex value itself. For that I need to change the vertex
 input format and also create my own custom vertex class. Is it really

 No, you don't need a custom vertex class or vertex input format. You can
 create/initialize the value at the beginning of the first superstep.

 *From:* Sardeshmukh, Vivek []
 *Sent:* Monday, July 21, 2014 2:05 PM
 *Subject:* Setting variable value in Compute class and using it in the
 next superstep

   Hi, all--

  In my algorithm, I need to set a flag if certain conditions hold
 (locally at a vertex v). If this flag is set then execute some other block
 of code *only once*, and do nothing until some other condition is hold.

  My question is, can I declare a flag variable in the class where I
 override compute function? I defined the flag as a public variable and
 setting it once the conditions are met but it seems the value is not
 carried over to the next superstep.

 I dig a little bit in this mailing list and found this

  This post also suggests (along with what I described above) to have a
 field in the vertex value itself. For that I need to change the vertex
 input format and also create my own custom vertex class. Is it really

  By the way, I am using Giraph 1.1.0 compiled against Hadoop 1.0.3. I was
 able to run SimpleShortestPathComputation successfully.

  Here are more technical details of my algorithm: I am trying to
 implement Delta-stepping shortest path algorithm ( or ). This
 was mentioned in Pregel paper. A vertex relax light edges if it belongs
 to the minimum bucket index (of course, aggregators!). Once a vertex is
 done with relaxing light edges it relaxes heavy edges (here is where I need
 a flag) once. A vertex may be re-inserted to a newer bucket and may have
 to execute all the steps that I described here again.


 A beginner in Giraph (and Java too!)

Re: Setting variable value in Compute class and using it in the next superstep

2014-07-21 Thread Matthew Saltz
Yeah, that's true. Sorry I forgot that part. Luckily, it isn't too tricky
either, depending on the input format of your graph. Here's another example to get you started,
for a very simple input format for edges with no values. I basically took
the code straight from here
modified where I needed to it to return the InputFormat that I needed for
my code. You'll probably be better off digging through some of the already
implemented InputFormat classes that come with Giraph to do something
similar, since I'm guessing your input files will be different than mine.
Take a look at the subclasses of TextVertexInputFormat,
since they deal with a lot of common input format styles, and see if you
can modify their code to work with your custom vertex data format. Now, the
example I give you is also easy because I just use the default constructor
of the class, but if you need to load additional data from the file into
your vertex data and the default constructor isn't appropriate, you may
have to do some extra parsing and legwork for that.

Best of luck,

On Tue, Jul 22, 2014 at 12:28 AM, Sardeshmukh, Vivek wrote:

  Thank you Matthew for the example link. It is helpful. I'll give it a

  If I have a custom vertex class isn't it necessary to change the
 VertexInputFormat class too? Since this class loads the data into the
 vertex and if vertex has a custom value field then it doesn't know how to
 load the input. Am I right?

 *From:* Schweiger, Tom
 *Sent:* Monday, July 21, 2014 5:16 PM
 *Subject:* RE: Setting variable value in Compute class and using it in
 the next superstep

 For more than one flag, a custom class is necessary (unless you're able
 to, say, toggle the sign bit to get double usage out or a value).

 I've started a private thread with Vivek to get a better understanding of
 what he was trying to solve.

 And you are also correct that there isn't much to writing a custom vertex
 class.  The key is making sure you read and write in the same order.
 Likewise, extending a vertex reader can be quite simple.

 *From:* Matthew Saltz []
 *Sent:* Monday, July 21, 2014 3:09 PM
 *Subject:* Re: Setting variable value in Compute class and using it in
 the next superstep


  If it's necessary to store more than one flag though, for example, won't
 a custom class be necessary? I'm a beginner too, so I apologize if I'm
 incorrect about that. Just to be clarify, to keep persistent data for a
 vertex from one superstep to the next, it is necessary to encapsulate it in
 the type used for the 'V', right? In other words, if Vivek tries to use a
 normal member variable for the Computation class, it won't work will it?

 Also, just to point out, there actually isn't too much involved with
 writing your own custom vertex class. Here's a quick example to get you started.
 Within your compute() method you can access the data in this class by doing

  SampleVertexData d = vertex.getValue();

  and then using d.setFlag(true) or boolean currentFlag = d.getFlag() for
 example.  And your computation class is now something like

  public class MyComputation extends BasicComputation*IdType, *
 *SampleVertexData*, *EdgeType, MessageType* {
 public void compute(Vertex*IdType, **SampleVertexData*, *EdgeType*
 vertex, Iterable*MessageType messages) {.} *



  As a warning, for this class I'm using Hadoop 0.20.203 and I'm also a
 beginner, so take everything I say with a grain of salt, and Tom please
 correct me if I'm wrong.

  Best of luck,

 On Mon, Jul 21, 2014 at 11:37 PM, Schweiger, Tom

  And in answer of :

 This post also suggests (along with what I described above) to have a
 field in the vertex value itself. For that I need to change the vertex
 input format and also create my own custom vertex class. Is it really

  No, you don't need a custom vertex class or vertex input format. You can
 create/initialize the value at the beginning of the first superstep.

 *From:* Sardeshmukh, Vivek []
 *Sent:* Monday, July 21, 2014 2:05 PM
 *Subject:* Setting variable value in Compute class and using it in the
 next superstep

Hi, all--

  In my algorithm, I need to set a flag if certain conditions hold
 (locally at a vertex v). If this flag is set then execute some other block
 of code *only once*, and do nothing until some other condition is hold