[ 
https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13171047#comment-13171047
 ] 

Claudio Martella commented on GIRAPH-45:
----------------------------------------

I must explained wrongly the idea. The output format is basically:

(key0, value2)
(key0, value1)
(key0, value4)
...
(key1, value4)
(key1, value2)
(key1, value7)
...

this is to say that the output file is sorted by key, but messages are not 
sorted, they end up in the file in the order they arrived. The index is a tree 
with a sample set of vertices (i.e. #totalVertices % N) with their position 
inside of that file. 
So if you want to look for the messages sent to a particular vertex, i.e. at 
the beginning of that vertex superstep, you'd look into the tree for the 
floorEntry(key) and scan that chunk until you see the beginning of the block 
where your messages are stored (and this for each file were we flushed). To 
minimize disk i/o, i'm using a bloomfilter so that i don't look into files were 
i reasonably know there aren't going to be messages for my vertex. At this 
point you can just iterate over the messages one after the other and feed them 
to the messages iterator fed to the compute() method.
So the index doesn't have one entry per vertex, but only for a subset of the 
vertices (and no entry at all for the messages).

An easier way would be to go as you suggest, to store all the messages to 
different files, flush them unsorted as they reach as certain threshold, and 
then load them up entirely at the beginning of a partition's superstep. This 
would allow us to avoid sort stuff but we'd need to be able to keep all the 
messages to all the vertices belonging to one partition in memory all the time. 
Maybe it's suitable, I was going for the approach that would require less 
in-memory contention.

Another optimization I was going to look into is to change the format to

(key0, #bytes, [message2, message1, message4])
(key1, #bytes, [message4, message2, message7])

this would allow us, when scanning the file looking for our block, to avoid 
deserializing all the messages as well, but just skip to the next key. It 
requires to serialize the message list to a byte[], so that i can write #bytes, 
before writing the messages to disk. I'll implement this after the naive 
version is ready.

Does it make sense like this to you?
                
> Improve the way to keep outgoing messages
> -----------------------------------------
>
>                 Key: GIRAPH-45
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-45
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
>
> As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a 
> potential problem to cause out of memory when the rate of message generation 
> is higher than the rate of message flush (or network bandwidth).
> To overcome this problem, we need more eager strategy for message flushing or 
> some approach to spill messages into disk.
> The below link is Dmitriy's suggestion.
> https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to