[ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eli Reisman updated GIRAPH-328: ------------------------------- Attachment: GIRAPH-328-3.patch A few small changes to make the solution more robust and ready for future mods/upgrades. Passes mvn verify, should be ready for initial review I think? I tried not to make too many sweeping changes to how messaging works but simply to store outgoing I -> List<M> grouped by WorkerInfo (actual destintion address) rather than partitionId. The partitionId plumbing is left intact in this solution as some of the code related to dynamic repartitioning etc. uses it. I am curious if it would matter if we got rid of the map of partitionId -> InetSocketAddress used by partition code since it is not used that much, and the "service" in NettyWorkerClient can get this from vertexId -> PartitionOwner mapping it carries. Perhaps the partitionId -> InetSocketAddress is really just extra plumbing at this point? anyway, let me know what should be done to improve this (or fix bugs!), otherwise I think its ready to go and/or other improvements could be in additional JIRAs. Will do some additional testing ASAP as well. > Outgoing messages from current superstep should be grouped at the sender by > owning worker, not by partition > ----------------------------------------------------------------------------------------------------------- > > Key: GIRAPH-328 > URL: https://issues.apache.org/jira/browse/GIRAPH-328 > Project: Giraph > Issue Type: Improvement > Components: bsp, graph > Affects Versions: 0.2.0 > Reporter: Eli Reisman > Assignee: Eli Reisman > Priority: Minor > Fix For: 0.2.0 > > Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, > GIRAPH-328-3.patch > > > Currently, outgoing messages created by the Vertex#compute() cycle on each > worker are stored and grouped by the partitionId on the destination worker to > which the messages belong. This results in messages being duplicated on the > wire per partition on a given receiving worker that has delivery vertices for > those messages. > By partitioning the outgoing, current-superstep messages by destination > worker, we can split them into partitions at insertion into a MessageStore on > the destination worker. What we trade in come compute time while inserting at > the receiver side, we gain in fine grained control over the real number of > messages each worker caches outbound for any given worker before flushing, > and how those flush messages are aggregated for delivery as well. > Potentially, it allows for a great reduction in duplicate messages sent in > situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. > You get the idea. > This might be a poor idea, and it can certainly use some additional > refinement, but it passes mvn verify and may even run ;) It interoperates > with the disk spill code, but not as well as it could. Consider this a > request for comment on the idea (and the approach) rather than a finished > product. > Comments/ideas/help welcome! Thanks -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira