[
https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453196#comment-13453196
]
Eli Reisman commented on GIRAPH-322:
------------------------------------
Thanks Maja, great advice. I will check into all these things. I know exactly
what you mean about the spill to disk, it will require some thought to
integrate. That has been the problem with this whole approach: Giraph is
hard-wired to Partition -> Vertex -> List<M> and broadcast-friendly data
structures really go against the grain.
The one issue that also makes spills to disk a tricky option is that once
spilled to disk each M object is written over and over (not a referenceId or
something), and when read back they are "fresh instances" again not ref's to
the same object. So we get a bunch of re-duplication on read from disk.
Eliminating this problem will require some thought and overhauls.
Our data already fits in memory well, its just a matter of getting the
messaging out of a N^2 growth situation and into a (N * a constant) area, where
the amortizing can do the rest of the job. The nice thing about the amortizing
(its an ugly solution I know) is that when only some workers are sending on any
given superstep, the steps pass extremely quickly. once something works well, I
will attempt to generalize the solution. Forcing the amortizing responsibility
out on the application users is not ideal, even if SimpleTriangleClosingVertex
provides an example to copy.
Thanks for the tip about the constructor, the one from the original class (if I
remember right) was just empty braces so I left it to default. Nice save!
So you're saying if I set the command line opts correctly, the Vertex#compute()
cycle will actually grind to a halt on each worker while the message system
copies to/from disk whenever messages pile up? if so, this is worth playing
with for sure, i did not see that behavior when I was trying to run the disk
spill options, amybe I just set it up wrong? without the compute cycle stopping
during spill reads/writes, the messages just pile up in memory instead of on
the network. Either way the re-duplication on disk read is going to keep that
from being a one-stop solution for us.
As far as 1 partition per worker, yes its funny I considered an alternate JIRA
when i was coding this to address the SendMessageCache issue you mentioned.
Since the reason for lots of small partitions per worker was to evenly
redistribute them on worker crash, it hasn't really hurt us to use 1 per worker
for this purpose since it drastically reduces messages required per worker, and
there is no re-distribution of partitions on worker crash, crash just ends the
job run for Giraph in its current form. Anyway a fix for both message caches in
this regard is a great idea, I will take a 2nd look at this.
As for how I think the folks on this end want the final solution to work, we're
working toward "spill to disk only when there's no other choice" so if it is a
part of the solution it should ideally be there just as an emergency buffer
against overload that kicks in once in a while. But just getting it to work at
the scale we want is my main goal, we can tune the use case after that!
OK, off to try this stuff...thanks so much for your thoughtful input!
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of
> control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
> Key: GIRAPH-322
> URL: https://issues.apache.org/jira/browse/GIRAPH-322
> Project: Giraph
> Issue Type: Improvement
> Components: bsp
> Affects Versions: 0.2.0
> Reporter: Eli Reisman
> Assignee: Eli Reisman
> Priority: Minor
> Fix For: 0.2.0
>
> Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the
> data structures and code paths used to transport messages through a Giraph
> application and out on the network. While messages to a single vertex can be
> combined (and should be) in some applications that could make use of this
> broadcast messaging, the out of control message growth of algorithms like
> triangle closing means we need to de-duplicate messages bound for many
> vertices/partitions.
> This will be an evolving solution (this first patch is just the first step)
> and currently it does not present a robust solution for disk-spill message
> stores. I figure I can get some advice about that or it can be a follow-up
> JIRA if this turns out to be a fruitful pursuit. This first patch is also
> Netty-only and simply defaults to the old sendMessagesToAllEdges()
> implementation if USE_NETTY is false. All this can be cleaned up when we know
> this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length
> encoding their delivery and only duplicating message on the network when they
> are bound for different partitions. This is also best when combined with
> "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is,
> it represents an end-to-end solution, using Netty, for in-memory messaging.
> It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I
> test it and ideas/suggestions crop up.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira