> On July 19, 2012, 5:02 p.m., Eli Reisman wrote:
> > Hi Maja,
> > 
> > This was a lot of really hard work, great job. My general discomfort with 
> > adopting this too quickly is this is a big change that adds a lot of new 
> > moving parts, and needs to be extensively tested in two ways:
> > 
> > 1. On real clusters, with varying data loads. Testing in pseudo mode for a 
> > change this big doesn't tell us if it helps or hurts us. This does involve 
> > (potentially) a lot of IO which adds overhead.
> > 
> > 2. Tested on algorithms that mutate the graph during compute() super steps 
> > so that we can objectively measure whats going on when that case comes up.
> > 
> > My main point: I would be a lot more comfortable with this and the other 
> > patch spilling to disk for partitions (also really great code) if anyone 
> > writing these was doing some metrics and was addressing the fact that we 
> > are not having a memory problem at very acceptable levels of scale-out 
> > during any time but INPUT_SUPERSTEP. If we're not focused on that, we are 
> > fixing a lot of stuff that has not proven to be broken yet. The metrics all 
> > clearly show during real cluster jobs on a wide variety of data load sizes 
> > that the critical moment is during super step -1 when the data is loaded 
> > and collections of vertices are sent out over Netty to their remote homes 
> > (partitions) in preparation for the calculation super steps.
> > 
> > A broad fix to this would include placing the mappers/workers on cluster 
> > nodes that a replica of the data they read is stored at (as in Hadoop, 
> > restoring locality) or to do the spills to disk during this phase only, 
> > when its easy since no processing is going on and they can easily be 
> > re-loaded when the splits are done and the memory pressure has receded. For 
> > the rest of the processing cycle, they should be fine. As we scale out 
> > further under the same memory constraints, we could add more creative 
> > spilling techniques if needed and once the INPUT_SUPERSTEP stuff was proven 
> > stable. Don't mean to rain on the parade but this really seems like a 
> > sensible way to go forward?
> > 
> > Regardless, everyone working on disk spill code as done really fine work, I 
> > admire it. If we adapt it to rein in the scope a bit, or back these ideas 
> > with some realistic testing/metrics, I'll be your biggest supporter!
> >

Hi Eli,

Thanks a lot for looking into this.

I totally agree that I need to do more testing and measurements for this 
solution, that was my plan from the beginning. I uploaded the patch now since 
it's big and I wanted to parallelise doing benchmarking and getting some 
comments about it. Sorry if that's the wrong way of doing it.

I think whether or not messages are going to cause memory problem strongly 
depends on the algorithm which is run. The way I see it, there are cases in 
which you won't have problems with graph (not even during input superstep) but 
you'll have them with messages. I do understand what you are saying, Alessandro 
is already trying to address input superstep problem, but I think this thing is 
also good to have.

Note that out-of-core messaging is implemented as an option, i.e. users can 
still choose not to use it, and in that case the way the whole system works 
haven't changed much (in terms of performance).

As for 2., graph mutations don't have big influence on messaging, apart from 
creation of nonexistent vertices which received messages. Am I missing 
something?

I'll be working on providing some metrics and will update when I do so.


- Maja


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6013/#review9285
-----------------------------------------------------------


On July 19, 2012, 12:09 p.m., Maja Kabiljo wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6013/
> -----------------------------------------------------------
> 
> (Updated July 19, 2012, 12:09 p.m.)
> 
> 
> Review request for giraph.
> 
> 
> Description
> -------
> 
> This patch introduces out-of-core messages support for Giraph. Some ideas are 
> taken from discussion in https://issues.apache.org/jira/browse/GIRAPH-45. 
> 
> We keep messages in MessageStore, from which they will be flushed to disk 
> when necessary. Messages are separated by partition. In moments we only flush 
> single partition to disk so we would still keep things in memory in case it's 
> time for next superstep. When flushing to disk, we write in the following 
> format:
> numberOfVertices
> vertexId1 numberOfMessages1 messagesForVertexId1
> vertexId2 numberOfMessages2 messagesForVertexId2
> ...
> Vertex ids are sorted. We don't require enough memory to fit all the messages 
> for partition, but we require that messages for a single vertex fit in 
> memory. 
> In the end we potentially have several files for each partition. When reading 
> messages, all the files are read sequentially.
> 
> DiskBackedMessageStoreByPartition handles all messages, 
> DiskBackedMessageStore is then used for a single partition, and 
> SequentialFileMessageStore handles single file.
> There is also SimpleMessageStore which doesn't use disk at all.
> 
> Options available to user:
> - whether or not to use out-of-core messaging
> - number of messages to keep in memory - this should probably be changed 
> (explained below)
> - size of buffer when reading from and writing to disk
> 
> ServerData now has two instances of message stores: one which is consumed in 
> current superstep with messages from previous superstep, and one in which it 
> will keep incoming messages for next superstep.
> 
> Other things which had to be changed:
> - Checkpointing - since messages are not kept in the vertex anymore, they 
> need to be stored separately.
> - Partition exchange between workers - same reasons as above - added 
> SendMessagesRequest
> - Messages are not assigned to vertex, they are just passed in compute
> - compute methods are now executed in order of vertex id inside of partition, 
> so we could have fast reading from disk
> 
> For memory check I only have the number of messages which I allow in memory. 
> This should be done better, but there is a problem since Alessandro's patch 
> for out-of-core graph also has memory checks. We don't want one of those 
> parts to use all the memory and leave too little space for the other, but I'm 
> not aware of a way to separately check memory usage of different data 
> structures.
> 
> I didn't integrate this with RPC, that's why there are some checks for 
> useNetty, those can be removed once the RPC is removed. Also, since vertex 
> doesn't keep messages in itself anymore, once RPC is removed we should also 
> remove getMessages/putMessages/getNumMessages from vertex, change initialize 
> to (id, value, edges, hasMessages) and just give messages to vertex when 
> calling compute.
> 
> I'll fix the part when partitions are sent around before superstep, since 
> that's the only part now which requires that all the messages for single 
> partition fit in memory.
> 
> 
> This addresses bug GIRAPH-45.
>     https://issues.apache.org/jira/browse/GIRAPH-45
> 
> 
> Diffs
> -----
> 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendMessagesRequest.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
>  1363291 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
>  PRE-CREATION 
>   
> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
>  1363291 
> 
> Diff: https://reviews.apache.org/r/6013/diff/
> 
> 
> Testing
> -------
> 
> Run mvn verify and tests in pseudo-distributed mode, all apart from this one 
> https://issues.apache.org/jira/browse/GIRAPH-259 pass.
> 
> 
> Thanks,
> 
> Maja Kabiljo
> 
>

Reply via email to