Re: Two joins in GraphX Pregel implementation

2015-07-28 Thread Ankur Dave
On 27 Jul 2015, at 16:42, Ulanov, Alexander  wrote:

> It seems that the mentioned two joins can be rewritten as one outer join


You're right. In fact, the outer join can be streamlined further using a
method from GraphOps:

g = g.joinVertices(messages)(vprog).cache()

Then, instead of passing newVerts as the active set for mapReduceTriplets,
we could pass `messages`.

If you're interested in proposing a PR for this, I've attached a patch with
these changes and updates to the comments.

On Tue, Jul 28, 2015 at 1:15 AM, Ulanov, Alexander 
 wrote:

> I’ve found two PRs (almost identical) for replacing mapReduceTriplets with
> aggregateMessages

[...]
> Do you know the reason why this improvement is not pushed?


There isn't any performance benefit to switching Pregel to use
aggregateMessages while preserving its current interface, because the
interface uses Iterators and would require us to wrap and unwrap them
anyway. The semantics of aggregateMessagesWithActiveSet are otherwise the
same as mapReduceTriplets, so there isn't any functionality we are missing
out on. And this change seems too small to justify introducing a new
version of Pregel, though it would be worthwhile when combined with other
improvements .

Ankur 


pregel-simplify-join.patch
Description: Binary data

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Re: GraphX: New graph operator

2015-06-01 Thread Ankur Dave
I think it would be good to have more basic operators like union or
difference, as long as they have an efficient distributed implementation
and are plausibly useful.

If they can be written in terms of the existing GraphX API, it would be
best to put them into GraphOps to keep the core GraphX implementation
small. The `mask` operation should actually be in GraphOps -- it's only in
GraphImpl for historical reasons. On the other hand, `subgraph` needs to be
in GraphImpl for performance: it accesses EdgeRDDImpl#filter(epred, vpred),
which can't be a public EdgeRDD method because its semantics rely on an
implementation detail (vertex replication).

Ankur 

On Mon, Jun 1, 2015 at 8:54 AM, Tarek Auel  wrote:

> Hello,
>
> Someone proposed in a Jira issue to implement new graph operations. Sean
> Owen recommended to check first with the mailing list, if this is
> interesting or not.
>
> So I would like to know, if it is interesting for GraphX to implement the
> operators like:
> http://en.wikipedia.org/wiki/Graph_operations and/or
> http://techieme.in/complex-graph-operations/
>
> If yes, should they be integrated into GraphImpl (like mask, subgraph
> etc.) or as external library? My feeling is that they are similar to mask.
> Because of consistency they should be part of the graph implementation
> itself.
>
> What do you guys think? I really would like to bring GraphX forward and
> help to implement some of these.
>
> Looking forward to hear your opinions
> Tarek
>
>


Re: GraphX implementation of ALS?

2015-05-26 Thread Ankur Dave
This is the latest GraphX-based ALS implementation that I'm aware of:
https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala

When I benchmarked it last year, it was about twice as slow as MLlib's ALS,
and I think the latter has gotten faster since then. The performance gap is
because the MLlib version implements some ALS-specific optimizations that
are hard to do using GraphX, such as storing the edges twice (partitioned
by source and by destination) to reduce communication.

Ankur 

On Tue, May 26, 2015 at 3:36 PM, Ben Mabey  wrote:

> I've heard in a number of presentations Spark's ALS implementation was
> going to be moved over to a GraphX version. For example, this
> presentation on GraphX
> (slide
> #23) at the Spark Summit mentioned a 40 LOC version using the Pregel API.
> Looking at the ALS source on master
> 
> it looks like the original implementation is still being used and no use of
> GraphX can be seen. Other algorithms mentioned in the GraphX presentation
> can be found in the repo
> 
> already but I don't see ALS. Could someone link me to the GraphX version
> for comparison purposes?  Also, could someone comment on why the the newer
> version isn't in use yet (i.e. are there tradeoffs with using the GraphX
> version that makes it less desirable)?
>


Re: GraphX vertex partition/location strategy

2015-01-19 Thread Ankur Dave
No - the vertices are hash-partitioned onto workers independently of the
edges. It would be nice for each vertex to be on the worker with the most
adjacent edges, but we haven't done this yet since it would add a lot of
complexity to avoid load imbalance while reducing the overall communication
by a small factor.

We refer to the number of partitions containing adjacent edges for a
particular vertex as the vertex's replication factor. I think the typical
replication factor for power-law graphs with 100-200 partitions is 10-15,
and placing the vertex at the ideal location would only reduce the
replication factor by 1.

Ankur 

On Mon, Jan 19, 2015 at 12:20 PM, Michael Malak <
michaelma...@yahoo.com.invalid> wrote:

> Does GraphX make an effort to co-locate vertices onto the same workers as
> the majority (or even some) of its edges?
>


Re: [VOTE] Designating maintainers for some Spark components

2014-11-06 Thread Ankur Dave
+1 (binding)

Ankur 

On Wed, Nov 5, 2014 at 5:31 PM, Matei Zaharia 
wrote:

> I'd like to formally call a [VOTE] on this model, to last 72 hours. The
> [VOTE] will end on Nov 8, 2014 at 6 PM PST.
>


Re: GraphX: some vertex with specific edge

2014-09-16 Thread Ankur Dave
At 2014-09-16 00:07:34 -0700, sochi  wrote:
> so, above example is like  a ---(e1)---> b ---(e1)---> c ---(e1)---> d
>
> In this case, can I find b,c and d when I have just src vertex, a and edge,
> e1?

First, to clarify: the three edges in your example are all distinct, since they 
have different source and destination vertices. Therefore I assume you're using 
e1 to refer to the edge property that they have in common.

In that case, this problem is equivalent to finding the connected component 
containing vertex a in the subgraph where edges have property e1. Here is how 
to do that in the Spark shell: 
https://gist.github.com/ankurdave/25732a493bc8c8541c97

Ankur

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: PARSING_ERROR from kryo

2014-09-15 Thread Ankur Dave
At 2014-09-15 08:59:48 -0700, Andrew Ash  wrote:
> I'm seeing the same exception now on the Spark 1.1.0 release.  Did you ever
> get this figured out?
>
> [...]
>
> On Thu, Aug 21, 2014 at 2:14 PM, npanj  wrote:
>> I am getting PARSING_ERROR while running my job on the code checked out up
>> to commit# db56f2df1b8027171da1b8d2571d1f2ef1e103b6.

The error is because I merged a GraphX PR that introduced a nondeterministic 
bug [1]. I reverted the faulty PR, but it was too late for the 1.1.0 release. 
The problem should go away if you use branch-1.1 or master. Sorry about that...

Ankur

[1] https://issues.apache.org/jira/browse/SPARK-3400

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Graphx seems to be broken while Creating a large graph(6B nodes in my case)

2014-08-25 Thread Ankur Dave
I posted the fix on the JIRA ticket 
(https://issues.apache.org/jira/browse/SPARK-3190). To update the user list, 
this is indeed an integer overflow problem when summing up the partition sizes. 
The fix is to use Longs for the sum: https://github.com/apache/spark/pull/2106.

Ankur


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: VertexPartition and ShippableVertexPartition

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao  wrote:

> On 7/28/14, 3:41 PM, shijiaxin wrote:
>
>> There is a VertexPartition in the EdgePartition,which is created by
>>
>> EdgePartitionBuilder.toEdgePartition.
>>
>> and There is also a ShippableVertexPartition in the VertexRDD.
>>
>> These two Partitions have a lot of common things like index, data and
>>
>> Bitset, why is this necessary?
>>
>>

There is a VertexPartition in the EdgePartition,which is created by
>
Is the VertexPartition in the EdgePartition, the Mirror Cache part?


Yes, exactly. The primary copy of each vertex is stored in the VertexRDD
using the index, values, and mask data structures, which together form a
hash map. In addition, each partition of the VertexRDD stores the
corresponding partition of the routing table to facilitate joining with the
edges. The ShippableVertexPartition class encapsulates the vertex hash map
along with a RoutingTablePartition.

After joining the vertices with the edges, the edge partitions cache their
adjacent vertices in the mirror cache. They use the VertexPartition for
this, which provides only the hash map functionality and not the routing
table.

Ankur 


Re: GraphX graph partitioning strategy

2014-07-25 Thread Ankur Dave
Oops, the code should be:

val unpartitionedGraph: Graph[Int, Int] = ...val numPartitions: Int = 128
def getTripletPartition(e: EdgeTriplet[Int, Int]): PartitionID = ...
// Get the triplets using GraphX, then use Spark to repartition
themval partitionedEdges = unpartitionedGraph.triplets
  .map(e => (getTripletPartition(e), e))
  .partitionBy(new HashPartitioner(numPartitions))
  *.map(pair => Edge(pair._2.srcId, pair._2.dstId, pair._2.attr))*
val partitionedGraph = Graph(unpartitionedGraph.vertices, partitionedEdges)


Ankur 


Re: GraphX graph partitioning strategy

2014-07-25 Thread Ankur Dave
Hi Larry,

GraphX's graph constructor leaves the edges in their original partitions by
default. To support arbitrary multipass graph partitioning, one idea is to
take advantage of that by partitioning the graph externally to GraphX
(though possibly using information from GraphX such as the degrees), then
pass the partitioned edges to GraphX.

For example, if you had an edge partitioning function that needed the full
triplet to assign a partition, you could do this as follows:

val unpartitionedGraph: Graph[Int, Int] = ...val numPartitions: Int = 128
def getTripletPartition(e: EdgeTriplet[Int, Int]): PartitionID = ...
// Get the triplets using GraphX, then use Spark to repartition
themval partitionedEdges = unpartitionedGraph.triplets
  .map(e => (getTripletPartition(e), e))
  .partitionBy(new HashPartitioner(numPartitions))
val partitionedGraph = Graph(unpartitionedGraph.vertices, partitionedEdges)


A multipass partitioning algorithm could store its results in the edge
attribute, and then you could use the code above to do the partitioning.

Ankur 


On Wed, Jul 23, 2014 at 11:59 PM, Larry Xiao  wrote:

> Hi all,
>
> I'm implementing graph partitioning strategy for GraphX, learning from
> researches on graph computing.
>
> I have two questions:
>
> - a specific implement question:
> In current design, only vertex ID of src and dst are provided
> (PartitionStrategy.scala).
> And some strategies require knowledge about the graph (like degrees) and
> can consist more than one passes to finally produce the partition ID.
> So I'm changing the PartitionStrategy.getPartition API to provide more
> info, but I don't want to make it complex. (the current one looks very
> clean)
>
> - an open question:
> What advice would you give considering partitioning, considering the
> procedure Spark adopt on graph processing?
>
> Any advice is much appreciated.
>
> Best Regards,
> Larry Xiao
>
> Reference
>
> Bipartite-oriented Distributed Graph Partitioning for Big Learning.
> PowerLyra : Differentiated Graph Computation and Partitioning on Skewed
> Graphs
>


Re: GraphX can not unpersist edges of old graph?

2014-06-12 Thread Ankur Dave
We didn't provide an unpersist API for Graph because the internal
dependency structure of a graph can make it hard to unpersist correctly in
a way that avoids recomputation. However, you can directly unpersist a
graph's vertices and edges RDDs using graph.vertices.unpersist() and
graph.edges.unpersist().

By the way, the memory leak bug with Pregel (SPARK-2025
) is fixed in master.

Ankur 


Re: Suggestion: rdd.compute()

2014-06-10 Thread Ankur Dave
You can achieve an equivalent effect by calling rdd.foreach(x => {}), which
is the lightest possible action that forces materialization of the whole
RDD.

Ankur 


Re: Removing spark-debugger.md file from master?

2014-06-03 Thread Ankur Dave
I agree, let's go ahead and remove it.

Ankur 


Re: [VOTE] Release Apache Spark 1.0.0 (RC11)

2014-05-27 Thread Ankur Dave
0

OK, I withdraw my downvote.

Ankur 


Re: Spark 1.0: outerJoinVertices seems to return null for vertex attributes when input was partitioned and vertex attribute type is changed

2014-05-26 Thread Ankur Dave
This is probably due to
SPARK-1931,
which I just fixed in PR #885 .
Is the problem resolved if you use the current Spark master?

Ankur