Avery, thank you for your comments.

On 8/2/12 8:58 AM, "Avery Ching" <avery.ch...@gmail.com> wrote:

>Hi Maja,
>Thanks for publishing your results!  Really nice performance
>improvement.  I have some questions/comments inline.
>On 8/1/12 11:43 AM, Maja Kabiljo wrote:
>> I've been running some benchmarking of this solution, I put in Excel
>> document in the attachment. There are some results of PageRankBenchmark
>> and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
>> 'Messages 3' show the cases in which we run out of memory. Shortest
>> algorithm uses messages very little when compared to the amount of other
>> data, so there I couldn't see any differences between solutions.
>> Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we
>> very tight on memory so going out of core helps (I ran those a few times
>> since, but keep getting the same results).
>Are you saying that out-of-core is faster that hitting memory boundaries
>(i.e. GC)?  It is a bit tough to imagine that out-of-core beats in-core

That's the only explanation I could think of, honestly it sounds wrong to
me too. But those are the results I keep getting. If someone has a better
one I'd love to hear it :-)

>> We can also see that execution
>> time is improved with just SimpleMessageStore, since in current
>> implementation we copy messages around when we store them in vertex.
>So the performance difference can be explained by reducing memory copies?

That's correct.

>> I also tried running RandomMessagesBenchmark with really huge amount of
>> messages, but it crashed because message store didn't process messages
>> fast enough and worker got flooded with unprocessed requests. So in
>> like that the only thing which could help us would be to decrease the
>> speed of compute executions. But I think this is something that
>> happen in real applications - this benchmark doesn't use received
>> at all, in a real application executions are going to be slower anyway
>> they have to process that much data. Anyway, it would be good to have a
>> real problem which uses messages intensively and then we could see
>> really going on.
>A question here:  Could we have set the max messages to a lower value to
>prevent the crashing?  What error did you actually see in this case?

I saw that message store keeps the number of messages in memory below the
limit, and also there are not many infrastructure objects created by it.
But we run out of memory anyways. So my guess is that data arrives to the
server but is not processed yet. I'll try to investigate more. Lowering
the max messages wouldn't help, since I even tried it with examples like:
max 1m messages of size 50B (~50MB total).

>> As a conclusion, to start with, maybe I can create a smaller patch from
>> this which only adds SimpleMessageStore, since as we can see keeping
>> messages outside of vertices helps. And then, once the RPC is removed,
>> will be able to finally remove putMessages/getMessages/getNumMessages
>> functions from Vertex.
>I think some folks are really going to like that.  It can allow them to
>directly implement MutableVertex (I think).
>> For the out-of-core part, if we still offer the
>> option not to use it as default, I see no harm of adding it also, and as
>> you can see there are benefits in some cases.
>I don't see any harm here at all.
>> Another thing, I think I should explain what from GIRAPH-45 discussion
>> I actually using here, since I don't use bloomfilters and BTrees. The
>> it works is the following:
>> - Inside the outer message store we have message stores for each of the
>> partitions separately.
>> - Partition message stores keep data in ordered map (ordered by vertex
>> - In outer messages store we check if we should flush something (do we
>> have more than allowed number of messages in memory). While we do, we
>> flush the partition with largest number of messages in memory.
>> - When partition messages store is flushed, all the data is written to a
>> file in the order of vertex ids, file content is like:
>> num_vertices
>> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
>> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
>> ...
>> - In the end each partition will have some messages in memory, and N
>> files, where N is the number of times it was flushed.
>> - When it's time to do the computation, within a single partition we
>> compute methods in order of vertex ids.
>> - We use buffered streams and read data from all partition files
>> sequentially, since we'll need data in the same order it's written in
>> of the files. This way we limit number of random file accesses.
>> Maja
>> On 7/24/12 1:45 AM, "Avery Ching" <ach...@apache.org> wrote:
>>> We should integrate the partitioning of the graph into the input
>>> superstep to get locality as well.  We can use MapReduce to try and
>>> schedule the map tasks (workers) closest to its data and then make the
>>> workers smart enough to only try to load their partitions.
>>> On 7/22/12 4:30 PM, Claudio Martella wrote:
>>>> I see your reasoning. In general I'm being open to use MR when
>>>> necessary (e.g. i used to propose it instead of the automatic vertex
>>>> creation), here it could get tricky. I can see additional HDFS usage
>>>> as down (you have to be able to store 2x the graph). However, once the
>>>> graph is pre-filtered, this additional stage would not be necessary
>>>> again for the successive jobs (only when a different number of workers
>>>> is used). Though, it does add a not so small passage to the process.
>>>> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta
>>>> wrote:
>>>>> Exactly. On paper, the amount of data around should be the same as
>>>>> during
>>>>> the computation, but in practice we do use a lot more memory.
>>>>> You can play with the settings and just push the problem a little
>>>>> farther
>>>>> away, by caching less and flushing requests more frequently, so then
>>>>> the
>>>>> bottleneck is on the servers.
>>>>> We're basically sending (k-1)/k of the graph through the network,
>>>>> where k
>>>>> is the number of workers.
>>>>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
>>>>> MapReduce is
>>>>> really good at (sorting and aggregating) in a probably inefficient
>>>>> at
>>>>> least non-scalable) way.
>>>>> We could try implementing it with a MapReduce job instead, where the
>>>>> mappers take input splits and emit (partition_id, vertex) (they would
>>>>> have
>>>>> access to the partitioner) and reducers just output the built
>>>>> partitions
>>>>> to HDFS.
>>>>> The computation stage would then be the usual Giraph job, where each
>>>>> worker knows where to get its partitions from HDFS.
>>>>> I can try making this change and see how it goes. It would just be
>>>>> MR
>>>>> job, so we're not selling our souls to iterative MR.
>>>>> I can also see many cases where one might not want to shuffle
>>>>> around at all: each worker reads a roughly equal part of the input
>>>>> (forget
>>>>> about bigger vertices for now) and simply communicates its own vertex
>>>>> ids
>>>>> to the master. Partition "a posteriori" instead of "a priori".
>>>>> What do you think?
>>>>> On 7/20/12 9:42 PM, "Eli Reisman" <initialcont...@gmail.com> wrote:
>>>>>> What we are seeing in the metrics is the three-way load of
>>>>>> 1. reading InputSplits from HDFS (mostly over the wire as there is
>>>>>> locality right now)
>>>>>> 2. creating temporary collections of vertices, sending them on netty
>>>>>> 3. simultaneously receiving collections of vertices on netty from
>>>>>> remote
>>>>>> nodes that will be place in the local workers' partitions for
>>>>>> processing
>>>>>> stages

Reply via email to