Regarding multicast RMI:

When a node joins the multicast group and begins receiving remote method invocations, it will need to be able to start accepting method invocations from the correct point in the stream, that is from the top level object in the remote method invocation.

When I was developing my validating atomic ObjectInputStream, I found I was also able to discard objects from the stream (I still had to read in their data and discard it), although the reason for doing so was different, for allowing results from a lookup service to be returned, with nulled out fields for objects that required downloaded code, when DownloadPermission was in force. The serialization protocol contains tokens that can be used to do this.

To prevent DOS (consuming all available memory), I also called reset after reaching the top level object in the method call object graph. Perhaps this reset could be used to signal to the node listening to the multicast stream, to commence using the input, rather than discarding it.

Regards,

Peter.

On 11/09/2015 7:22 PM, Bryan Thompson wrote:
Gregg,

Graphs traversal is in general a non-local problem with irregular, data
dependent parallelism (the available fine grained parallelism for a vertex
depends on its edge list, the size of the edge list can vary over many
orders of magnitude, edges may connect vertices that are non local, and the
size of the active frontier of vertices during traversal and also vary by
many orders of magnitude).

The 2D decomposition minimizes the number of communications that need to be
performed.  There are also hybrid decompositions that can minimize the
communication volume (amount of data that needs to be transmitted),
especially when combined with graph aware partitioning.

A data layout that ignores these issues will require more communication
operations and/or have a greater communications volume.  Communication is
the main barrier to scaling for graphs. So data layouts matter and
innovation in data layouts is one of the key things driving improved
scaling.

We actually use hardware acceleration as well. So each compute node has one
or more gpus that are used to parallelize operations on the local edges.
The gpus use an MPI distribution that uses RDMA over infiniband for off
node communication. This avoids the overhead of having the CPU coordinate
communication (data is not copied to the CPU but does directly over PCIe to
the infiniband card).

The question about tree based communication patterns arises from an
interest in being able to use java to coordinate data management
activities.  These activities are not are as performance critical as the
basic graph traversal, but they should still show good scaling. Tree based
communication patterns are one way to achieve that scaling.

Thanks,
Bryan

On Thursday, September 10, 2015, Gregg Wonderly<gregg...@gmail.com>  wrote:

Why doesn’t java spaces let you submit requests and have them worked on
and results returned without assigning any particular node any specific
responsibility?

Gregg

On Aug 1, 2015, at 12:06 PM, Bryan Thompson<br...@systap.com
<javascript:;>>  wrote:
First, thanks for the responses and the interest in this topic.  I have
been traveling for the last few days and have not had a chance to follow
up.
- The network would have multiple switches, but not multiple routers.
The
typical target is an infiniband network.  Of course, Java let's us bind
to
infiniband now.

- As far as I understand it, MPI relies on each node executing the same
logic in a distributed communication pattern.  Thus the concept of a
leader
election to determine a balanced tree probably does not show up. Instead,
the tree is expressed in terms of the MPI rank assigned to each node.  I
am
not suggesting that the same design pattern is used for river.

- We do need a means to define the relationship between a distributed
communication pattern and the manner in which data are decomposed onto a
cluster.  I am not sure that the proposal above gives us this directly,
but
some extension of it probably would.   Let me give an example.  In our
application, we are distributing the edges of a graph among a 2-D cluster
of compute nodes (p x p machines).  The distribution is done by assigning
the edges to compute nodes based on some function (key-range,
hash-function) of the source and target vertex identifiers.  When we want
to read all edges in the graph, we need to do an operation that is data
parallel across either the rows (in-edges) or the columns (out-edges) of
the cluster. See http://mapgraph.io/papers/UUSCI-2014-002.pdf for a TR
that
describes this communication pattern for a p x p cluster of GPUs.  In
order
to make this work with river, we would somehow have to associate the
nodes
with their positions in this 2-D topology.  For example, we could
annotate
each node with a "row" and "column" attribute that specifies its location
in the compute grid.  We could then have a communicator for each row and
each column based on the approach you suggest above.

The advantage of such tree based communication patterns is quite large.
They require log(p) communication operations where you would otherwise
do p
communication operations.  So, for example, only 4 communication
operations
vs 16 for a 16 node cluster.

Thanks,
Bryan


On Wed, Jul 29, 2015 at 1:17 PM, Greg Trasuk<tras...@stratuscom.com
<javascript:;>>  wrote:
I’ve wondered about doing this in the past, but for the workloads I’ve
worked with, I/O time has been relatively low compared to processing
time.
I’d guess there’s some combination of message frequency, cluster size
and
message size that makes it compelling.

The idea is interesting, though, because it could enable things like
distributed JavaSpaces, where we’d be distributing the search queries,
etc.
I would guess the mechanism would look like:

-Member nodes want to form a multicast group.
-They elect a leader
-Leader figures out a balanced notification tree, and passes it on to
each
member
-Leader receives multicast message and starts the message passing into
the
tree
-Recipients pass the message to local recipients, and also to their
designated repeater recipients (how many?)
-Somehow we monitor for disappearing members and then recast the leader
election if necessary.

Paxon protocol would be involved, I’d guess.  Does anyone have
references
to any academic work on presence monitoring and leader election, beyond
Lamport’s original paper?

I also wonder, is there a reason not to just use Multicast if it’s
available (I realize that it isn’t always supported - Amazon EC2, for
instance).

Interesting question!

Cheers,

Greg Trasuk

On Jul 29, 2015, at 12:40 PM, Bryan Thompson<br...@systap.com
<javascript:;>>  wrote:
Hello,

I am wondering if anyone has looked into creating tree based algorithms
for
multi-cast of RMI messages for river.  Assuming a local cluster, such
patterns generally have log(p) cost for a cluster with p nodes.

For the curious, this is how many MPI messages are communicated under
the
hood.

Thanks,
Bryan



Reply via email to