[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13170121#comment-13170121 ]
Claudio Martella commented on GIRAPH-45: ---------------------------------------- Yes, we don't, I agree. As a matter of fact during the last two days I've been working on an implementation of the previous model with own code. It's going on quite smoothly: my prototype, not integrated with giraph, should be ready at the end of today. If you're so much of hurry I can upload it as an attachment ASAP, when it's complete (again as an external class, not as a giraph patch) and you can help me out (much appreciation here) with the integration (as a giraph patch this time =) ). The prototype implements a Memstore + SequenceFile approach to store the (I vertex, M message) pairs *sorted* on disk. As we don't work on durability, on deletes and on updates, this turned out being quite simple and quite disk-efficient. We have the typical memstore where we store the data, when we reach a given threshold (maxSize or some %freeHeap) we flush (sorted by vertex)it to disk and we keep a small BTree for that SequenceFile in memory. As we don't have updates or deletes, we don't really need to do merge as in BigTable, as we don't do random reads, we don't need compaction, as we don't do durability we don't need log-files. Before proceding with the compute() of a given vertex we ask all the SequenceFiles to seek to the first message for the given Vertex,if any, and we read sequentially among memstore and files. If we don't send many messages, we basically never touch the disk but just the memstore and we go back to the just-in-memory approach which is in giraph right now, without any actual particular code to handle the two different cases. Also, when we read we have a stream with ALL the messages to the vertex, so it would make it quite combiner-efficient. Basically, if senders were also partially sorting messages prior to sending, what we would have is something very similar to the shuffle&sort phase of MR. Which brings me to some considerations about your points. For how I see it, what makes MR not so efficient for iterative graph processing is, a part of job boot-time, is that it not only forwards messages but also the graph-structure/topology between map and reduce phases and between jobs. On our side, we never hit the disk for messages and we keep the graph structure stateful in memory, so we have a double win (which makes it quite difficult to sell a comparison between MR and Pregel). If we start hitting the disk (and heavily the network, as you're proposing HDFS: bear in mind that A might write to DFS node B data that might actually be directed to C => double network access, a bit like if mappers would write to hdfs in mapreduce) to save the graph and the messages, we better just use MapReduce at first place. Of course I see you're saving the multiple job-boottime and you're actually not writing EVERYTHING, but only what doesn't fit in memory, but I have a possibly simpler idea in mind that goes towards what you have in mind. If we start with the assumption that we can keep at least the graph in memory, we can basically never write it to disk and just write the messages to disk when necessary (in our case more messages would hit the disk than in your scenario, because we'd have more pressure due to the fact we keep the whole graph in memory). But because we have a quite memory efficient way of of storing the messages to disk, in a sorted way, we can take advantage of it when we run the the superstep. Suppose we have the vertices in each partition sorted by vertexID, just the same way their messages are sorted on disk. Iterating over the vertices belonging to that partition and scanning the messages directed to vertices for that partition would have a linear 1-1 mapping, basically meaning we'd never have to seek in the on-disk messagebox. What this would bring to is something similar to mapreduce, for how the values/messages are handled, but still pregel-ish for how the structure is kept in memory. Does it make sense? Do you want me to share the complete prototyp-ish library as an attachment on here this evening? > Improve the way to keep outgoing messages > ----------------------------------------- > > Key: GIRAPH-45 > URL: https://issues.apache.org/jira/browse/GIRAPH-45 > Project: Giraph > Issue Type: Improvement > Components: bsp > Reporter: Hyunsik Choi > Assignee: Hyunsik Choi > > As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a > potential problem to cause out of memory when the rate of message generation > is higher than the rate of message flush (or network bandwidth). > To overcome this problem, we need more eager strategy for message flushing or > some approach to spill messages into disk. > The below link is Dmitriy's suggestion. > https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira