Re: Can Giraph handle graphs with very large number of edges per vertex?
Hi Jeyendran, I was just sayiing the same thing about the documentation on another thread, couldn't agree more. There will be progress on this soon, I promise. I'd like us to reach a model of "if you add a new feature or change a core feature, the patch gets committed contingent on a new wiki page of docs going up on the website." There's still nothing about our new Vertex API, master compute, etc. on the wiki. I would say 8 gigs to play with is a great amount where you will most definitely be able to get very large interesting graphs to run in-memory, depending on how many workers (with 8G each) you have to work with. having 3-4 workers per machine is not a bad thing if you are provisioned to do this. And lots of machines. This is a distributed batch processing framework, so more is better ;) as far as vertices with a million edges, sure but it depends on how many of them and your compute resources. Again, can't go into much detail but Giraph has been extensively tested using real-world, large, interesting, useful graph data. This includes large social graphs that have supernodes. So if you're supplying that, and you have the gear to run your data, you've picked the right tool. You can spill to disk, run in memory, or spread the load and scale to many, many workers (Mapper tasks) hosted on many nodes and Giraph will behave well if you have the compute resource to scale to fit your volume of data. On Tue, Sep 11, 2012 at 12:27 AM, Avery Ching wrote: > Hi Jeyendran, nice to meet you. > > Answers inline. > > > On 9/10/12 11:23 PM, Jeyendran Balakrishnan wrote: > >> I am trying to understand what kind of data Giraph holds in memory per >> worker. >> My questions in descending order of importance: >> 1. Does Giraph hold in memory exactly one vertex of data at a time, or >> does >> it need to hold all the vertexes assigned to that worker? >> > All vertices assigned to that worker. > > > 2. Can Giraph handle vertexes with, a million edges per vertex? >> > Depends on how much memory you have. Would recommend making a custom > vertex implementation that has a very efficient store for better > scalability (i.e. see IntIntNullIntVertex). > > If not, at what order of magnitude does it break down? - 1000 edges, >> 10K >> edges, 100K edges?... >>(Of course, I understand that this depends upon the -Xmx value, so >> let's >> say we fix a value of -Xmx8g). >> 3. Are there any limitations on the kind of objects that can be used as >> vertices? >> Specifically, does Giraph assume that vertices are lightweight (eg, >> integer vertex ID + simple Java primitive vertex values + collection of >> out-edges), >> or can Giraph support heavyweight vertices (hold complex nested Java >> objects in a vertex)? >> > Limitations are that the vertex implementation must be Writable, the > vertex index must be WritableComparable, edge type Writable, message type > Writable. > > > 4. More generally, what data is stored in memory, and what, if any, is >> offloaded/spilled to disk? >> > Messages and vertices can be spilled to disk, but you must enable this. > > Would appreciate any light the experts can throw on this. >> >> On this note, I would like to mention that the presentations posted on the >> Wiki explain what Giraph can do, and how to use it from a coding >> perspective, but there are no explanations of the design approach used, >> the >> rationale behind the choices, and the software architecture. I feel that >> new >> users can really benefit from a design and architecture document, along >> the >> lines of Hadoop and Lucene. For folks who are considering whether or not >> to >> use Giraph, this can be a big help. The only alternative today is to read >> the source code, the burden of which might in itself be reason for folks >> not >> to consider using Giraph. >> My 2c :-) >> > > Agreed that documentation is lacking =). That being said, the > presentations explain most of the design approach and reasons. I would > refer to the Pregel paper for a more detailed look or ask if you have any > specific questions. > >> >> Thanks a lot, >> > No problem! > >> Jeyendran >> >> >> >
Re: How to register aggregators with the 'new' Giraph?
Hey Maja, A small tutorial on the wiki would be wonderful, either now or when the final changes to aggregators in the upcoming patches are done. We need a wiki entry for master compute too. I would like to go through and update some of the website examples as well regarding best practices with the new Vertex API, using the bin/giraph script and command line opts to set up jobs without writing your own run() method, implementing Tool, and writing your own IO Formats, etc. Thanks again! On Tue, Sep 11, 2012 at 9:36 AM, Paolo Castagna wrote: > Hi Maja, > yep, your explanation makes sense. > > Clear now. > > Paiki > > On 11 September 2012 16:09, Maja Kabiljo wrote: > > Hi Paolo, > > > > Glad to hear it works :-) > > > > The reason why you don't see the value you set with setAggregatedValue > > right away is that we want to read aggregated values from previous > > superstep and change them for next one. It goes the same with vertices > > where you call aggregate to give values for next superstep and read the > > values from previous. This is actually the part which wasn't working well > > before - it wasn't possible to get aggregated value without changes that > > vertices on the same worker made in current superstep. Hope this makes it > > clear for you. > > > > Maja > > > > > > On 9/11/12 12:45 PM, "Paolo Castagna" wrote: > > > >>Hi, > >>the green bar is back. :-) > >> > >>I made multiple mistakes in relation to the new aggregators but now I > >>believe I grasped how they work. > >> > >>For those interested the PageRankVertex, PageRankMasterCompute and > >>PageRankWorkerContext are here: > >> > https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce > > >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV > >>ertex.java > >> > https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce > > >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM > >>asterCompute.java > >> > https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce > > >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW > >>orkerContext.java > >> > >>There might be some further improvement left, but I'll try that another > >>time. > >> > >>For example: > >> > >> registerPersistentAggregator("dangling-current", > >>DoubleSumAggregator.class); > >> registerPersistentAggregator("error-current", > >>DoubleSumAggregator.class); > >> > >>Could probably be registerAggregator. > >> > >>I also noticed that within the compute() method if I call > >>setAggregatedValue("name", ...) and getAggregatedValue("name") I don't > >>seem to get the value set back. But the value is sent to the worker. > >>This is not important, but it confuses me. > >> > >>I do agree with you, now the situation around aggregators is cleaner > >>than before. > >> > >>Thank you for your help. > >> > >>Paolo > >> > >>PS: > >>There is still a known failure in the tests, that is to show that the > >>SimplePageRankVertex approach is "too simple", it does not give back a > >>probability distribution (i.e. sum at the end is not 1.0) and it does > >>not take into account dangling nodes properly. > >>On the other hand, PageRankVertex produces same results as two other > >>implementations: one serial, all in memory and another one using JUNG. > >> > >>On 11 September 2012 11:03, Maja Kabiljo wrote: > >>> Hi Paolo, > >>> > >>> You get null for aggregated value because aggregators haven't been > >>> registered yet in the moment WorkerContext.preApllication() is called. > >>>But > >>> I think that shouldn't be a problem since you can set initial values > for > >>> aggregators in MasterCompute.initialize(). > >>> > >>> Please also note that you are not using the new aggregator api in the > >>> proper way. Function getAggregatedValue will return the value of the > >>> aggregator, not the aggregator object itself. It's not possible to set > >>>the > >>> value of the aggregators on workers (in methods from WorkerContext and > >>> Vertex), because that would produce nondeterministic results. You > >>> aggregate on workers and set values on master. > >>> > >>> As for persistent vs regular aggregator, value of regular aggregator is > >>> being reset before each superstep, while the persistent isn't. For > >>> example, if you have a persistent sum aggregator its value is going to > >>>be > >>> the sum of all values given to it from the beginning of application. If > >>> you have regular sum aggregator the value is going to be just the sum > of > >>> values from previous superstep. > >>> > >>> I can write a small tutorial about aggregators if someone can tell me > >>> where and how to do that. :-) I see that for people who were using > >>> aggregators before these changes will be confusing, but I think that > for > >>> the ones who are starting with current state it will be much easier. > >>> > >>> Maja > >>> > >>> On 9/11/12 9:49 AM, "Paolo Castagna" wrote: > >>> > Hi, >
Re: How to register aggregators with the 'new' Giraph?
Hi Maja, yep, your explanation makes sense. Clear now. Paiki On 11 September 2012 16:09, Maja Kabiljo wrote: > Hi Paolo, > > Glad to hear it works :-) > > The reason why you don't see the value you set with setAggregatedValue > right away is that we want to read aggregated values from previous > superstep and change them for next one. It goes the same with vertices > where you call aggregate to give values for next superstep and read the > values from previous. This is actually the part which wasn't working well > before - it wasn't possible to get aggregated value without changes that > vertices on the same worker made in current superstep. Hope this makes it > clear for you. > > Maja > > > On 9/11/12 12:45 PM, "Paolo Castagna" wrote: > >>Hi, >>the green bar is back. :-) >> >>I made multiple mistakes in relation to the new aggregators but now I >>believe I grasped how they work. >> >>For those interested the PageRankVertex, PageRankMasterCompute and >>PageRankWorkerContext are here: >>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV >>ertex.java >>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM >>asterCompute.java >>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW >>orkerContext.java >> >>There might be some further improvement left, but I'll try that another >>time. >> >>For example: >> >> registerPersistentAggregator("dangling-current", >>DoubleSumAggregator.class); >> registerPersistentAggregator("error-current", >>DoubleSumAggregator.class); >> >>Could probably be registerAggregator. >> >>I also noticed that within the compute() method if I call >>setAggregatedValue("name", ...) and getAggregatedValue("name") I don't >>seem to get the value set back. But the value is sent to the worker. >>This is not important, but it confuses me. >> >>I do agree with you, now the situation around aggregators is cleaner >>than before. >> >>Thank you for your help. >> >>Paolo >> >>PS: >>There is still a known failure in the tests, that is to show that the >>SimplePageRankVertex approach is "too simple", it does not give back a >>probability distribution (i.e. sum at the end is not 1.0) and it does >>not take into account dangling nodes properly. >>On the other hand, PageRankVertex produces same results as two other >>implementations: one serial, all in memory and another one using JUNG. >> >>On 11 September 2012 11:03, Maja Kabiljo wrote: >>> Hi Paolo, >>> >>> You get null for aggregated value because aggregators haven't been >>> registered yet in the moment WorkerContext.preApllication() is called. >>>But >>> I think that shouldn't be a problem since you can set initial values for >>> aggregators in MasterCompute.initialize(). >>> >>> Please also note that you are not using the new aggregator api in the >>> proper way. Function getAggregatedValue will return the value of the >>> aggregator, not the aggregator object itself. It's not possible to set >>>the >>> value of the aggregators on workers (in methods from WorkerContext and >>> Vertex), because that would produce nondeterministic results. You >>> aggregate on workers and set values on master. >>> >>> As for persistent vs regular aggregator, value of regular aggregator is >>> being reset before each superstep, while the persistent isn't. For >>> example, if you have a persistent sum aggregator its value is going to >>>be >>> the sum of all values given to it from the beginning of application. If >>> you have regular sum aggregator the value is going to be just the sum of >>> values from previous superstep. >>> >>> I can write a small tutorial about aggregators if someone can tell me >>> where and how to do that. :-) I see that for people who were using >>> aggregators before these changes will be confusing, but I think that for >>> the ones who are starting with current state it will be much easier. >>> >>> Maja >>> >>> On 9/11/12 9:49 AM, "Paolo Castagna" wrote: >>> Hi, this is how I run the PageRank implementation (mine takes into account dangling nodes and checks for convergence): Map params = new HashMap(); params.put(GiraphJob.WORKER_CONTEXT_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); params.put(GiraphJob.MASTER_COMPUTE_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); String[] data = getData ( filename ); Iterable results = InternalVertexRunner.run( PageRankVertex.class, PageRankVertexInputFormat.class, PageRankVertexOutputFormat.class, params, data ); This used to work, however I was registering aggregators in PageRankVertexWorkerContext (see below). Now, I am trying to do the sa
Re: How to register aggregators with the 'new' Giraph?
Maja, you could add your tutorial here if you like: http://cwiki.apache.org/confluence/display/GIRAPH/Index On 9/11/12 8:09 AM, Maja Kabiljo wrote: Hi Paolo, Glad to hear it works :-) The reason why you don't see the value you set with setAggregatedValue right away is that we want to read aggregated values from previous superstep and change them for next one. It goes the same with vertices where you call aggregate to give values for next superstep and read the values from previous. This is actually the part which wasn't working well before - it wasn't possible to get aggregated value without changes that vertices on the same worker made in current superstep. Hope this makes it clear for you. Maja On 9/11/12 12:45 PM, "Paolo Castagna" wrote: Hi, the green bar is back. :-) I made multiple mistakes in relation to the new aggregators but now I believe I grasped how they work. For those interested the PageRankVertex, PageRankMasterCompute and PageRankWorkerContext are here: https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV ertex.java https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM asterCompute.java https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW orkerContext.java There might be some further improvement left, but I'll try that another time. For example: registerPersistentAggregator("dangling-current", DoubleSumAggregator.class); registerPersistentAggregator("error-current", DoubleSumAggregator.class); Could probably be registerAggregator. I also noticed that within the compute() method if I call setAggregatedValue("name", ...) and getAggregatedValue("name") I don't seem to get the value set back. But the value is sent to the worker. This is not important, but it confuses me. I do agree with you, now the situation around aggregators is cleaner than before. Thank you for your help. Paolo PS: There is still a known failure in the tests, that is to show that the SimplePageRankVertex approach is "too simple", it does not give back a probability distribution (i.e. sum at the end is not 1.0) and it does not take into account dangling nodes properly. On the other hand, PageRankVertex produces same results as two other implementations: one serial, all in memory and another one using JUNG. On 11 September 2012 11:03, Maja Kabiljo wrote: Hi Paolo, You get null for aggregated value because aggregators haven't been registered yet in the moment WorkerContext.preApllication() is called. But I think that shouldn't be a problem since you can set initial values for aggregators in MasterCompute.initialize(). Please also note that you are not using the new aggregator api in the proper way. Function getAggregatedValue will return the value of the aggregator, not the aggregator object itself. It's not possible to set the value of the aggregators on workers (in methods from WorkerContext and Vertex), because that would produce nondeterministic results. You aggregate on workers and set values on master. As for persistent vs regular aggregator, value of regular aggregator is being reset before each superstep, while the persistent isn't. For example, if you have a persistent sum aggregator its value is going to be the sum of all values given to it from the beginning of application. If you have regular sum aggregator the value is going to be just the sum of values from previous superstep. I can write a small tutorial about aggregators if someone can tell me where and how to do that. :-) I see that for people who were using aggregators before these changes will be confusing, but I think that for the ones who are starting with current state it will be much easier. Maja On 9/11/12 9:49 AM, "Paolo Castagna" wrote: Hi, this is how I run the PageRank implementation (mine takes into account dangling nodes and checks for convergence): Map params = new HashMap(); params.put(GiraphJob.WORKER_CONTEXT_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); params.put(GiraphJob.MASTER_COMPUTE_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); String[] data = getData ( filename ); Iterable results = InternalVertexRunner.run( PageRankVertex.class, PageRankVertexInputFormat.class, PageRankVertexOutputFormat.class, params, data ); This used to work, however I was registering aggregators in PageRankVertexWorkerContext (see below). Now, I am trying to do the same in PageRankVertexMasterCompute which extends DefaultMasterCompute and has only one method: @Override public void initialize() throws InstantiationException, IllegalAccessException { log.debug("initialize"); registerPersistentAggregator("dangling-current", DoubleSumAggregator.class); registerPe
Re: How to register aggregators with the 'new' Giraph?
Hi Paolo, Glad to hear it works :-) The reason why you don't see the value you set with setAggregatedValue right away is that we want to read aggregated values from previous superstep and change them for next one. It goes the same with vertices where you call aggregate to give values for next superstep and read the values from previous. This is actually the part which wasn't working well before - it wasn't possible to get aggregated value without changes that vertices on the same worker made in current superstep. Hope this makes it clear for you. Maja On 9/11/12 12:45 PM, "Paolo Castagna" wrote: >Hi, >the green bar is back. :-) > >I made multiple mistakes in relation to the new aggregators but now I >believe I grasped how they work. > >For those interested the PageRankVertex, PageRankMasterCompute and >PageRankWorkerContext are here: >https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV >ertex.java >https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM >asterCompute.java >https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce >a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW >orkerContext.java > >There might be some further improvement left, but I'll try that another >time. > >For example: > > registerPersistentAggregator("dangling-current", >DoubleSumAggregator.class); > registerPersistentAggregator("error-current", >DoubleSumAggregator.class); > >Could probably be registerAggregator. > >I also noticed that within the compute() method if I call >setAggregatedValue("name", ...) and getAggregatedValue("name") I don't >seem to get the value set back. But the value is sent to the worker. >This is not important, but it confuses me. > >I do agree with you, now the situation around aggregators is cleaner >than before. > >Thank you for your help. > >Paolo > >PS: >There is still a known failure in the tests, that is to show that the >SimplePageRankVertex approach is "too simple", it does not give back a >probability distribution (i.e. sum at the end is not 1.0) and it does >not take into account dangling nodes properly. >On the other hand, PageRankVertex produces same results as two other >implementations: one serial, all in memory and another one using JUNG. > >On 11 September 2012 11:03, Maja Kabiljo wrote: >> Hi Paolo, >> >> You get null for aggregated value because aggregators haven't been >> registered yet in the moment WorkerContext.preApllication() is called. >>But >> I think that shouldn't be a problem since you can set initial values for >> aggregators in MasterCompute.initialize(). >> >> Please also note that you are not using the new aggregator api in the >> proper way. Function getAggregatedValue will return the value of the >> aggregator, not the aggregator object itself. It's not possible to set >>the >> value of the aggregators on workers (in methods from WorkerContext and >> Vertex), because that would produce nondeterministic results. You >> aggregate on workers and set values on master. >> >> As for persistent vs regular aggregator, value of regular aggregator is >> being reset before each superstep, while the persistent isn't. For >> example, if you have a persistent sum aggregator its value is going to >>be >> the sum of all values given to it from the beginning of application. If >> you have regular sum aggregator the value is going to be just the sum of >> values from previous superstep. >> >> I can write a small tutorial about aggregators if someone can tell me >> where and how to do that. :-) I see that for people who were using >> aggregators before these changes will be confusing, but I think that for >> the ones who are starting with current state it will be much easier. >> >> Maja >> >> On 9/11/12 9:49 AM, "Paolo Castagna" wrote: >> >>>Hi, >>>this is how I run the PageRank implementation (mine takes into account >>>dangling nodes and checks for convergence): >>> >>>Map params = new HashMap(); >>>params.put(GiraphJob.WORKER_CONTEXT_CLASS, >>>"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); >>>params.put(GiraphJob.MASTER_COMPUTE_CLASS, >>>"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); >>> >>>String[] data = getData ( filename ); >>>Iterable results = InternalVertexRunner.run( >>> PageRankVertex.class, >>> PageRankVertexInputFormat.class, >>> PageRankVertexOutputFormat.class, >>> params, >>> data >>>); >>> >>>This used to work, however I was registering aggregators in >>>PageRankVertexWorkerContext (see below). >>> >>>Now, I am trying to do the same in PageRankVertexMasterCompute which >>>extends DefaultMasterCompute and has only one method: >>> >>>@Override >>>public void initialize() throws InstantiationException, >>>IllegalAccessException { >>> log.debug("initialize"); >>> registerPersis
Re: reason behind a java.io.EOFException
Thanks a lot, Avery! You were right, it works now! :) Cheers, FM On Tue, Sep 11, 2012 at 9:38 AM, Avery Ching wrote: > Hi Franco, I think that the problem is that there is a bug in the > serialization/deserialization of LongDoubleNullDoubleVertex (this is your > class right?). That's why it works on one worker, but not more. > > Avery > > > On 9/10/12 11:24 PM, Franco Maria Nardini wrote: >> >> Thanks a lot, Avery. >> >> I tried your solution but now I got this error that seems related to >> netty. Am I wrong? >> >> Best, >> >> FM >> >> --- >> 2012-09-11 08:19:41,796 WARN >> org.apache.giraph.comm.netty.handler.RequestServerHandler: >> exceptionCaught: Channel failed with remote address /172.20.10.3:50077 >> java.io.EOFException: fieldSize is too long! Length is 8, but maximum is 5 >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:230) >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.readLong(ChannelBufferInputStream.java:198) >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.readDouble(ChannelBufferInputStream.java:153) >> at >> org.apache.giraph.graph.LongDoubleNullDoubleVertex.readFields(LongDoubleNullDoubleVertex.java:157) >> at >> org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:79) >> at >> org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:90) >> at >> org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:82) >> at >> org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:67) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) >> at >> org.apache.giraph.comm.netty.ByteCounter.handleUpstream(ByteCounter.java:61) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) >> at >> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91) >> at >> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:385) >> at >> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:256) >> at >> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> at java.lang.Thread.run(Thread.java:680) >> 2012-09-11 08:19:41,798 WARN >> org.apache.giraph.comm.netty.handler.RequestServerHandler: >> exceptionCaught: Channel failed with remote address /172.20.10.3:50077 >> java.io.EOFException: fieldSize is too long! Length is 8, but maximum is 1 >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:230) >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.readLong(ChannelBufferInputStream.java:198) >> at >> org.jboss.netty.buffer.ChannelBufferInputStream.readDouble(ChannelBufferInputStream.java:153) >> at >> org.apache.giraph.graph.LongDoubleNullDoubleVertex.readFields(LongDoubleNullDoubleVertex.java:157) >> at >> org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:79) >> at >> org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:90) >> at >> org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:82) >> at >> org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:67) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) >> at >> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) >> at >> org.apache.giraph.comm.netty.ByteCounter.handleUpstream(ByteCounter.java:61) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) >> at >> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) >> at >> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91) >> at >> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java
Re: How to register aggregators with the 'new' Giraph?
Hi, the green bar is back. :-) I made multiple mistakes in relation to the new aggregators but now I believe I grasped how they work. For those interested the PageRankVertex, PageRankMasterCompute and PageRankWorkerContext are here: https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69cea071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertex.java https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69cea071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankMasterCompute.java https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69cea071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankWorkerContext.java There might be some further improvement left, but I'll try that another time. For example: registerPersistentAggregator("dangling-current", DoubleSumAggregator.class); registerPersistentAggregator("error-current", DoubleSumAggregator.class); Could probably be registerAggregator. I also noticed that within the compute() method if I call setAggregatedValue("name", ...) and getAggregatedValue("name") I don't seem to get the value set back. But the value is sent to the worker. This is not important, but it confuses me. I do agree with you, now the situation around aggregators is cleaner than before. Thank you for your help. Paolo PS: There is still a known failure in the tests, that is to show that the SimplePageRankVertex approach is "too simple", it does not give back a probability distribution (i.e. sum at the end is not 1.0) and it does not take into account dangling nodes properly. On the other hand, PageRankVertex produces same results as two other implementations: one serial, all in memory and another one using JUNG. On 11 September 2012 11:03, Maja Kabiljo wrote: > Hi Paolo, > > You get null for aggregated value because aggregators haven't been > registered yet in the moment WorkerContext.preApllication() is called. But > I think that shouldn't be a problem since you can set initial values for > aggregators in MasterCompute.initialize(). > > Please also note that you are not using the new aggregator api in the > proper way. Function getAggregatedValue will return the value of the > aggregator, not the aggregator object itself. It's not possible to set the > value of the aggregators on workers (in methods from WorkerContext and > Vertex), because that would produce nondeterministic results. You > aggregate on workers and set values on master. > > As for persistent vs regular aggregator, value of regular aggregator is > being reset before each superstep, while the persistent isn't. For > example, if you have a persistent sum aggregator its value is going to be > the sum of all values given to it from the beginning of application. If > you have regular sum aggregator the value is going to be just the sum of > values from previous superstep. > > I can write a small tutorial about aggregators if someone can tell me > where and how to do that. :-) I see that for people who were using > aggregators before these changes will be confusing, but I think that for > the ones who are starting with current state it will be much easier. > > Maja > > On 9/11/12 9:49 AM, "Paolo Castagna" wrote: > >>Hi, >>this is how I run the PageRank implementation (mine takes into account >>dangling nodes and checks for convergence): >> >>Map params = new HashMap(); >>params.put(GiraphJob.WORKER_CONTEXT_CLASS, >>"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); >>params.put(GiraphJob.MASTER_COMPUTE_CLASS, >>"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); >> >>String[] data = getData ( filename ); >>Iterable results = InternalVertexRunner.run( >> PageRankVertex.class, >> PageRankVertexInputFormat.class, >> PageRankVertexOutputFormat.class, >> params, >> data >>); >> >>This used to work, however I was registering aggregators in >>PageRankVertexWorkerContext (see below). >> >>Now, I am trying to do the same in PageRankVertexMasterCompute which >>extends DefaultMasterCompute and has only one method: >> >>@Override >>public void initialize() throws InstantiationException, >>IllegalAccessException { >> log.debug("initialize"); >> registerPersistentAggregator("dangling-current", >>DoubleSumAggregator.class); >> registerPersistentAggregator("error-current", >>DoubleSumAggregator.class); >> registerPersistentAggregator("pagerank-sum", DoubleSumAggregator.class); >> registerPersistentAggregator("vertices-count", LongSumAggregator.class); >>} >> >>I am not 100% sure about registerAggregator vs. >>registerPersistentAggregator. >> >>The initialize() method is now being called, I see this on the console: >>09:34:46 DEBUG PageRankVertexMasterCompute :: initialize >> >>In PageRankVertexWorkerContext which extends WorkerContex I override >>the preApplication() method: >> >>@SuppressWarnings("unchecked") >>@Override >>public void preApplication() throws InstantiationExcep
Re: How to register aggregators with the 'new' Giraph?
Hi Paolo, You get null for aggregated value because aggregators haven't been registered yet in the moment WorkerContext.preApllication() is called. But I think that shouldn't be a problem since you can set initial values for aggregators in MasterCompute.initialize(). Please also note that you are not using the new aggregator api in the proper way. Function getAggregatedValue will return the value of the aggregator, not the aggregator object itself. It's not possible to set the value of the aggregators on workers (in methods from WorkerContext and Vertex), because that would produce nondeterministic results. You aggregate on workers and set values on master. As for persistent vs regular aggregator, value of regular aggregator is being reset before each superstep, while the persistent isn't. For example, if you have a persistent sum aggregator its value is going to be the sum of all values given to it from the beginning of application. If you have regular sum aggregator the value is going to be just the sum of values from previous superstep. I can write a small tutorial about aggregators if someone can tell me where and how to do that. :-) I see that for people who were using aggregators before these changes will be confusing, but I think that for the ones who are starting with current state it will be much easier. Maja On 9/11/12 9:49 AM, "Paolo Castagna" wrote: >Hi, >this is how I run the PageRank implementation (mine takes into account >dangling nodes and checks for convergence): > >Map params = new HashMap(); >params.put(GiraphJob.WORKER_CONTEXT_CLASS, >"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); >params.put(GiraphJob.MASTER_COMPUTE_CLASS, >"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); > >String[] data = getData ( filename ); >Iterable results = InternalVertexRunner.run( > PageRankVertex.class, > PageRankVertexInputFormat.class, > PageRankVertexOutputFormat.class, > params, > data >); > >This used to work, however I was registering aggregators in >PageRankVertexWorkerContext (see below). > >Now, I am trying to do the same in PageRankVertexMasterCompute which >extends DefaultMasterCompute and has only one method: > >@Override >public void initialize() throws InstantiationException, >IllegalAccessException { > log.debug("initialize"); > registerPersistentAggregator("dangling-current", >DoubleSumAggregator.class); > registerPersistentAggregator("error-current", >DoubleSumAggregator.class); > registerPersistentAggregator("pagerank-sum", DoubleSumAggregator.class); > registerPersistentAggregator("vertices-count", LongSumAggregator.class); >} > >I am not 100% sure about registerAggregator vs. >registerPersistentAggregator. > >The initialize() method is now being called, I see this on the console: >09:34:46 DEBUG PageRankVertexMasterCompute :: initialize > >In PageRankVertexWorkerContext which extends WorkerContex I override >the preApplication() method: > >@SuppressWarnings("unchecked") >@Override >public void preApplication() throws InstantiationException, >IllegalAccessException { > log.debug("preApplication()"); > >System.out.println(((Aggregator)getAggregatedValue("error- >current"))); > >((Aggregator)getAggregatedValue("error-current")).setAggre >gatedValue( >new DoubleWritable( Double.MAX_VALUE ) ); >} > >The getAggregatedValue("error-current") above is null and I do not >understand why. > >Just to make things even more clear, this is how I used to run the >PageRank implementation locally: >https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd5 >39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPageRa >nkVertexLocally.java >And this is the WorkerContext I used to have: >https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd5 >39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV >ertexWorkerContext.java > >As you can see, it used to call registerAggregator(...) in the >preApplication() method: > >@SuppressWarnings("unchecked") >@Override >public void preApplication() throws InstantiationException, >IllegalAccessException { > log.debug("preApplication()"); > registerAggregator("dangling-current", SumAggregator.class); > registerAggregator("error-current", SumAggregator.class); > registerAggregator("pagerank-sum", SumAggregator.class); > registerAggregator("vertices-count", LongSumAggregator.class); > > >((Aggregator)getAggregator("error-current")).setAggregated >Value( >new DoubleWritable( Double.MAX_VALUE ) ); >} > >The registerAggregator() method in WorkerContext is gone and I am >trying to achieve the same via MasterCompute now. > >Regards, >Paolo > > > > >On 11 September 2012 00:20, Paolo Castagna >wrote: >> Hi Gianmarco, >> good, that was one problem... but I am not yet back to the green bar. >> >> Here is how I am running it locally now: >> >> Map params = new HashMap(); >> params.put(GiraphJob.WORKER_CONTEXT_CLASS, >> "org.a
Re: How to register aggregators with the 'new' Giraph?
Hi, this is how I run the PageRank implementation (mine takes into account dangling nodes and checks for convergence): Map params = new HashMap(); params.put(GiraphJob.WORKER_CONTEXT_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); params.put(GiraphJob.MASTER_COMPUTE_CLASS, "org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute"); String[] data = getData ( filename ); Iterable results = InternalVertexRunner.run( PageRankVertex.class, PageRankVertexInputFormat.class, PageRankVertexOutputFormat.class, params, data ); This used to work, however I was registering aggregators in PageRankVertexWorkerContext (see below). Now, I am trying to do the same in PageRankVertexMasterCompute which extends DefaultMasterCompute and has only one method: @Override public void initialize() throws InstantiationException, IllegalAccessException { log.debug("initialize"); registerPersistentAggregator("dangling-current", DoubleSumAggregator.class); registerPersistentAggregator("error-current", DoubleSumAggregator.class); registerPersistentAggregator("pagerank-sum", DoubleSumAggregator.class); registerPersistentAggregator("vertices-count", LongSumAggregator.class); } I am not 100% sure about registerAggregator vs. registerPersistentAggregator. The initialize() method is now being called, I see this on the console: 09:34:46 DEBUG PageRankVertexMasterCompute :: initialize In PageRankVertexWorkerContext which extends WorkerContex I override the preApplication() method: @SuppressWarnings("unchecked") @Override public void preApplication() throws InstantiationException, IllegalAccessException { log.debug("preApplication()"); System.out.println(((Aggregator)getAggregatedValue("error-current"))); ((Aggregator)getAggregatedValue("error-current")).setAggregatedValue( new DoubleWritable( Double.MAX_VALUE ) ); } The getAggregatedValue("error-current") above is null and I do not understand why. Just to make things even more clear, this is how I used to run the PageRank implementation locally: https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPageRankVertexLocally.java And this is the WorkerContext I used to have: https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertexWorkerContext.java As you can see, it used to call registerAggregator(...) in the preApplication() method: @SuppressWarnings("unchecked") @Override public void preApplication() throws InstantiationException, IllegalAccessException { log.debug("preApplication()"); registerAggregator("dangling-current", SumAggregator.class); registerAggregator("error-current", SumAggregator.class); registerAggregator("pagerank-sum", SumAggregator.class); registerAggregator("vertices-count", LongSumAggregator.class); ((Aggregator)getAggregator("error-current")).setAggregatedValue( new DoubleWritable( Double.MAX_VALUE ) ); } The registerAggregator() method in WorkerContext is gone and I am trying to achieve the same via MasterCompute now. Regards, Paolo On 11 September 2012 00:20, Paolo Castagna wrote: > Hi Gianmarco, > good, that was one problem... but I am not yet back to the green bar. > > Here is how I am running it locally now: > > Map params = new HashMap(); > params.put(GiraphJob.WORKER_CONTEXT_CLASS, > "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext"); > params.put(GiraphJob.MASTER_COMPUTE_CLASS, > "org.apache.jena.grande.giraph.pagerank.SimplePageRankVertexMasterCompute"); > > String[] data = getData ( filename ); > Iterable results = InternalVertexRunner.run( > PageRankVertex.class, > PageRankVertexInputFormat.class, > PageRankVertexOutputFormat.class, > params, > data > ); > > However, I need to learn more about the MasterComput (and its relation > with WorkerContext). > > Paolo > > On 10 September 2012 22:08, Gianmarco De Francisci Morales > wrote: >> Hi Paolo, >> >> Are you setting the MasterCompute class? >> You can do it with this option of bin/giraph >> -mc,--masterCompute MasterCompute class >> >> Cheers, >> -- >> Gianmarco >> >> >> >> On Mon, Sep 10, 2012 at 9:36 PM, Paolo Castagna >> wrote: >>> >>> Hi, >>> first and foremost, thanks for all the work and improvements on Giraph. >>> I went away from computers for a while (personal reasons) and changed >>> job, now I am back and playing with Giraph when I can. >>> >>> I updated my little examples (overall, it was easy and quick, here the >>> changes [1]. Just in case others are in a similar situation and want >>> to have a look). >>> >>> I am not sure I get the 'new' aggregators and in particular how I can >>> 'register' them. My tests failing confirm my non understanding! And >>> forgive m
Re: reason behind a java.io.EOFException
Hi Franco, I think that the problem is that there is a bug in the serialization/deserialization of LongDoubleNullDoubleVertex (this is your class right?). That's why it works on one worker, but not more. Avery On 9/10/12 11:24 PM, Franco Maria Nardini wrote: Thanks a lot, Avery. I tried your solution but now I got this error that seems related to netty. Am I wrong? Best, FM --- 2012-09-11 08:19:41,796 WARN org.apache.giraph.comm.netty.handler.RequestServerHandler: exceptionCaught: Channel failed with remote address /172.20.10.3:50077 java.io.EOFException: fieldSize is too long! Length is 8, but maximum is 5 at org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:230) at org.jboss.netty.buffer.ChannelBufferInputStream.readLong(ChannelBufferInputStream.java:198) at org.jboss.netty.buffer.ChannelBufferInputStream.readDouble(ChannelBufferInputStream.java:153) at org.apache.giraph.graph.LongDoubleNullDoubleVertex.readFields(LongDoubleNullDoubleVertex.java:157) at org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:79) at org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:90) at org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:82) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:67) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.apache.giraph.comm.netty.ByteCounter.handleUpstream(ByteCounter.java:61) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:385) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:256) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680) 2012-09-11 08:19:41,798 WARN org.apache.giraph.comm.netty.handler.RequestServerHandler: exceptionCaught: Channel failed with remote address /172.20.10.3:50077 java.io.EOFException: fieldSize is too long! Length is 8, but maximum is 1 at org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:230) at org.jboss.netty.buffer.ChannelBufferInputStream.readLong(ChannelBufferInputStream.java:198) at org.jboss.netty.buffer.ChannelBufferInputStream.readDouble(ChannelBufferInputStream.java:153) at org.apache.giraph.graph.LongDoubleNullDoubleVertex.readFields(LongDoubleNullDoubleVertex.java:157) at org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:79) at org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:90) at org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:82) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:67) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.apache.giraph.comm.netty.ByteCounter.handleUpstream(ByteCounter.java:61) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:385) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:256) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at
Re: Can Giraph handle graphs with very large number of edges per vertex?
Hi Jeyendran, nice to meet you. Answers inline. On 9/10/12 11:23 PM, Jeyendran Balakrishnan wrote: I am trying to understand what kind of data Giraph holds in memory per worker. My questions in descending order of importance: 1. Does Giraph hold in memory exactly one vertex of data at a time, or does it need to hold all the vertexes assigned to that worker? All vertices assigned to that worker. 2. Can Giraph handle vertexes with, a million edges per vertex? Depends on how much memory you have. Would recommend making a custom vertex implementation that has a very efficient store for better scalability (i.e. see IntIntNullIntVertex). If not, at what order of magnitude does it break down? - 1000 edges, 10K edges, 100K edges?... (Of course, I understand that this depends upon the -Xmx value, so let's say we fix a value of -Xmx8g). 3. Are there any limitations on the kind of objects that can be used as vertices? Specifically, does Giraph assume that vertices are lightweight (eg, integer vertex ID + simple Java primitive vertex values + collection of out-edges), or can Giraph support heavyweight vertices (hold complex nested Java objects in a vertex)? Limitations are that the vertex implementation must be Writable, the vertex index must be WritableComparable, edge type Writable, message type Writable. 4. More generally, what data is stored in memory, and what, if any, is offloaded/spilled to disk? Messages and vertices can be spilled to disk, but you must enable this. Would appreciate any light the experts can throw on this. On this note, I would like to mention that the presentations posted on the Wiki explain what Giraph can do, and how to use it from a coding perspective, but there are no explanations of the design approach used, the rationale behind the choices, and the software architecture. I feel that new users can really benefit from a design and architecture document, along the lines of Hadoop and Lucene. For folks who are considering whether or not to use Giraph, this can be a big help. The only alternative today is to read the source code, the burden of which might in itself be reason for folks not to consider using Giraph. My 2c :-) Agreed that documentation is lacking =). That being said, the presentations explain most of the design approach and reasons. I would refer to the Pregel paper for a more detailed look or ask if you have any specific questions. Thanks a lot, No problem! Jeyendran