[
https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eli Reisman updated GIRAPH-328:
-------------------------------
Attachment: GIRAPH-328-3.patch
A few small changes to make the solution more robust and ready for future
mods/upgrades. Passes mvn verify, should be ready for initial review I think?
I tried not to make too many sweeping changes to how messaging works but simply
to store outgoing I -> List<M> grouped by WorkerInfo (actual destintion
address) rather than partitionId. The partitionId plumbing is left intact in
this solution as some of the code related to dynamic repartitioning etc. uses
it. I am curious if it would matter if we got rid of the map of partitionId ->
InetSocketAddress used by partition code since it is not used that much, and
the "service" in NettyWorkerClient can get this from vertexId -> PartitionOwner
mapping it carries. Perhaps the partitionId -> InetSocketAddress is really just
extra plumbing at this point?
anyway, let me know what should be done to improve this (or fix bugs!),
otherwise I think its ready to go and/or other improvements could be in
additional JIRAs. Will do some additional testing ASAP as well.
> Outgoing messages from current superstep should be grouped at the sender by
> owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
> Key: GIRAPH-328
> URL: https://issues.apache.org/jira/browse/GIRAPH-328
> Project: Giraph
> Issue Type: Improvement
> Components: bsp, graph
> Affects Versions: 0.2.0
> Reporter: Eli Reisman
> Assignee: Eli Reisman
> Priority: Minor
> Fix For: 0.2.0
>
> Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch,
> GIRAPH-328-3.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each
> worker are stored and grouped by the partitionId on the destination worker to
> which the messages belong. This results in messages being duplicated on the
> wire per partition on a given receiving worker that has delivery vertices for
> those messages.
> By partitioning the outgoing, current-superstep messages by destination
> worker, we can split them into partitions at insertion into a MessageStore on
> the destination worker. What we trade in come compute time while inserting at
> the receiver side, we gain in fine grained control over the real number of
> messages each worker caches outbound for any given worker before flushing,
> and how those flush messages are aggregated for delivery as well.
> Potentially, it allows for a great reduction in duplicate messages sent in
> situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314.
> You get the idea.
> This might be a poor idea, and it can certainly use some additional
> refinement, but it passes mvn verify and may even run ;) It interoperates
> with the disk spill code, but not as well as it could. Consider this a
> request for comment on the idea (and the approach) rather than a finished
> product.
> Comments/ideas/help welcome! Thanks
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira