Re: creating a distributed index

2015-07-15 Thread Ankur Dave
The latest version of IndexedRDD supports any key type with a defined
serializer
https://github.com/amplab/spark-indexedrdd/blob/master/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/KeySerializer.scala,
including Strings. It's not released yet, but you can use it from the
master branch if you're interested.

Ankur http://www.ankurdave.com/

On Wed, Jul 15, 2015 at 12:43 AM, Jem Tucker jem.tuc...@gmail.com wrote:

 With regards to Indexed structures in Spark are there any alternatives to
 IndexedRDD for more generic keys including Strings?

 Thanks

 Jem



Re: Effecient way to fetch all records on a particular node/partition in GraphX

2015-05-17 Thread Ankur Dave
If you know the partition IDs, you can launch a job that runs tasks on only
those partitions by calling sc.runJob
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1686.
For example, we do this in IndexedRDD
https://github.com/amplab/spark-indexedrdd/blob/f0c42dcad1f49ce36140f0c1f7d2c3ed61ed373e/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexedRDDLike.scala#L100
to get particular keys without launching a task on every partition.

Ankur http://www.ankurdave.com/

On Sun, May 17, 2015 at 8:32 AM, mas mas.ha...@gmail.com wrote:

 I have distributed my RDD into say 10 nodes. I want to fetch the data that
 resides on a particular node say node 5. How i can achieve this?
 I have tried mapPartitionWithIndex function to filter the data of that
 corresponding node, however it is pretty expensive.



Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs

2015-04-16 Thread Ankur Dave
I'm the primary author of IndexedRDD. To answer your questions:

1. Operations on an IndexedRDD partition can only be performed from a task
operating on that partition, since doing otherwise would require
decentralized coordination between workers, which is difficult in Spark. If
you want to perform cross-partition lookups, you'll have to do all the
lookups in a batch step as follows:

val a = IndexedRDD(...)
val b = sc.parallelize(...)
// Perform an operation on b that produces some keys to look up in a
val lookups: RDD[Long] = b.map(...)
// Repartition the desired keys to their appropriate partitions in a and do
local lookups, returning the corresponding values
val results = a.innerJoin(b.map(k = (k, ( { (id, v, unit) = v }

2. IndexedRDD originated from GraphX but can be used for general operations
as long as they fit within Spark's batch-oriented programming model.

By the way, a new version of IndexedRDD is about to be released. If you
decide to use IndexedRDD I'd suggest trying that out, since it provides a
cleaner interface, more predictable performance, and support for arbitrary
key types: https://github.com/amplab/spark-indexedrdd/pull/4

Ankur http://www.ankurdave.com/

On Thu, Apr 16, 2015 at 2:34 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Thanks but we need a firm statement and preferably from somebody from the
 spark vendor Data Bricks including answer to the specific question posed by
 me and assessment/confirmation whether this is a production ready / quality
 library which can be used for general purpose RDDs not just inside the
 context of graphx



 *From:* Koert Kuipers [mailto:ko...@tresata.com]
 *Sent:* Thursday, April 16, 2015 10:31 PM
 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs



 i believe it is a generalization of some classes inside graphx, where
 there was/is a need to keep stuff indexed for random access within each rdd
 partition



 On Thu, Apr 16, 2015 at 5:00 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 Can somebody from Data Briks sched more light on this Indexed RDD library

 https://github.com/amplab/spark-indexedrdd

 It seems to come from AMP Labs and most of the Data Bricks guys are from
 there

 What is especially interesting is whether the Point Lookup (and the other
 primitives) can work from within a function (e.g. map) running on executors
 on worker nodes



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: [GraphX] aggregateMessages with active set

2015-04-09 Thread Ankur Dave
Actually, GraphX doesn't need to scan all the edges, because it
maintains a clustered index on the source vertex id (that is, it sorts
the edges by source vertex id and stores the offsets in a hash table).
If the activeDirection is appropriately set, it can then jump only to
the clusters with active source vertices.

See the EdgePartition#index field [1], which stores the offsets, and
the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which
decides whether to do a full scan or use the index.

[1] 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60
[2]. 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266

Ankur


On Thu, Apr 9, 2015 at 3:21 AM, James alcaid1...@gmail.com wrote:
 In aggregateMessagesWithActiveSet, Spark still have to read all edges. It
 means that a fixed time which scale with graph size is unavoidable on a
 pregel-like iteration.

 But what if I have to iterate nearly 100 iterations but at the last 50
 iterations there are only  0.1% nodes need to be updated ? The fixed time
 make the program finished at a unacceptable time consumption.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] aggregateMessages with active set

2015-04-07 Thread Ankur Dave
We thought it would be better to simplify the interface, since the
active set is a performance optimization but the result is identical
to calling subgraph before aggregateMessages.

The active set option is still there in the package-private method
aggregateMessagesWithActiveSet. You can actually access it publicly
via GraphImpl, though the API isn't guaranteed to be stable:
graph.asInstanceOf[GraphImpl[VD,ED]].aggregateMessagesWithActiveSet(...)
Ankur


On Tue, Apr 7, 2015 at 2:56 AM, James alcaid1...@gmail.com wrote:
 Hello,

 The old api of GraphX mapReduceTriplets has an optional parameter
 activeSetOpt: Option[(VertexRDD[_] that limit the input of sendMessage.

 However, to the new api aggregateMessages I could not find this option,
 why it does not offer any more?

 Alcaid

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Graphx gets slower as the iteration number increases

2015-03-24 Thread Ankur Dave
This might be because partitions are getting dropped from memory and
needing to be recomputed. How much memory is in the cluster, and how large
are the partitions? This information should be in the Executors and Storage
pages in the web UI.

Ankur http://www.ankurdave.com/

On Tue, Mar 24, 2015 at 7:12 PM, orangepri...@foxmail.com 
orangepri...@foxmail.com wrote:

 I'm working with graphx to calculate the pageranks of an extreme large
 social network with billion verteces.
 As iteration number increases, the speed of each iteration becomes slower
 and unacceptable. Is there any reason of it?



Re: Learning GraphX Questions

2015-02-13 Thread Ankur Dave
At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote:
 1) How do you actually run programs in GraphX? At the moment I've been doing
 everything live through the shell, but I'd obviously like to be able to work
 on it by writing and running scripts.

You can create your own projects that build against Spark and GraphX through a 
Maven dependency [1], then run those applications using the bin/spark-submit 
script included with Spark [2].

These guides assume you already know how to do this using your preferred build 
tool (SBT or Maven). In short, here's how to do it with SBT:

1. Install SBT locally (`brew install sbt` on OS X).

2. Inside your project directory, create a build.sbt file listing Spark and 
GraphX as a dependency, as in [3].

3. Run `sbt package` in a shell.

4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit.

[1] 
http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark
[2] http://spark.apache.org/docs/latest/submitting-applications.html
[3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4

 2) Is there a way to check the status of the partitions of a graph? For
 example, I want to determine for starters if the number of partitions
 requested are always made, like if I ask for 8 partitions but only have 4
 cores what happens?

You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you 
can do for example: graph.vertices.partitions

 3) Would I be able to partition by vertex instead of edges, even if I had to
 write it myself? I know partitioning by edges is favored in a majority of
 the cases, but for the sake of research I'd like to be able to do both.

If you pass PartitionStrategy.EdgePartition1D, this will partition edges by 
their source vertices, so all edges with the same source will be 
co-partitioned, and the communication pattern will be similar to 
vertex-partitioned (edge-cut) systems like Giraph.

 4) Is there a better way to time processes outside of using built-in unix
 timing through the logs or something?

I think the options are Unix timing, log file timestamp parsing, looking at the 
web UI, or writing timing code within your program (System.currentTimeMillis 
and System.nanoTime).

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX: ShortestPaths does not terminate on a grid graph

2015-01-29 Thread Ankur Dave
Thanks for the reminder. I just created a PR:
https://github.com/apache/spark/pull/4273
Ankur


On Thu, Jan 29, 2015 at 7:25 AM, Jay Hutfles jayhutf...@gmail.com wrote:
 Just curious, is this set to be merged at some point?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: graph.inDegrees including zero values

2015-01-25 Thread Ankur Dave
You can do this using leftJoin, as collectNeighbors [1] does:

graph.vertices.leftJoin(graph.inDegrees) {
  (vid, attr, inDegOpt) = inDegOpt.getOrElse(0)
}

[1] 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala#L145

Ankur


On Sun, Jan 25, 2015 at 5:52 AM, scharissis stefano.charis...@gmail.com wrote:
 If a vertex has no in-degree then Spark's GraphOp 'inDegree' does not return
 it at all. Instead, it would be very useful to me to be able to have that
 vertex returned with an in-degree of zero.
 What's the best way to achieve this using the GraphX API?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX: ShortestPaths does not terminate on a grid graph

2015-01-22 Thread Ankur Dave
At 2015-01-22 02:06:37 -0800, NicolasC nicolas.ch...@inria.fr wrote:
 I try to execute a simple program that runs the ShortestPaths algorithm
 (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph.
 I use Spark 1.2.0 downloaded from spark.apache.org.

 This program runs more than 2 hours when the grid size is 70x70 as above, and 
 is then killed
 by the resource manager of the cluster (Torque). After a 5-6 minutes of 
 execution, the
 Spark master UI does not even respond.

 For a grid size of 30x30, the program terminates in about 20 seconds, and for 
 a grid size
 of 50x50 it finishes in about 80 seconds. The problem appears for a grid size 
 of 70x70 and
 above.

Unfortunately this problem is due to a Spark bug. In later iterations of 
iterative algorithms, the lineage maintained for fault tolerance grows long and 
causes Spark to consume increasing amounts of resources for scheduling and task 
serialization.

The workaround is to checkpoint the graph periodically, which writes it to 
stable storage and interrupts the lineage chain before it grows too long.

If you're able to recompile Spark, you can do this by applying the patch to 
GraphX at the end of this mail, and before running graph algorithms, calling

sc.setCheckpointDir(/tmp)

to set the checkpoint directory as desired.

Ankur

=== patch begins here ===

diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 5e55620..1fbbb87 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -126,6 +126,8 @@ object Pregel extends Logging {
 // Loop
 var prevG: Graph[VD, ED] = null
 var i = 0
+val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty
+val checkpointFrequency = 25
 while (activeMessages  0  i  maxIterations) {
   // Receive the messages. Vertices that didn't get any messages do not 
appear in newVerts.
   val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
@@ -139,6 +141,14 @@ object Pregel extends Logging {
   // get to send messages. We must cache messages so it can be 
materialized on the next line,
   // allowing us to uncache the previous iteration.
   messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDirection))).cache()
+
+  if (checkpoint  i % checkpointFrequency == checkpointFrequency - 1) {
+logInfo(Checkpointing in iteration  + i)
+g.vertices.checkpoint()
+g.edges.checkpoint()
+messages.checkpoint()
+  }
+
   // The call to count() materializes `messages`, `newVerts`, and the 
vertices of `g`. This
   // hides oldMessages (depended on by newVerts), newVerts (depended on by 
messages), and the
   // vertices of prevG (depended on by newVerts, oldMessages, and the 
vertices of g).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using graphx to calculate average distance of a big graph

2015-01-06 Thread Ankur Dave
[-dev]

What size of graph are you hoping to run this on? For small graphs where
materializing the all-pairs shortest path is an option, you could simply
find the APSP using https://github.com/apache/spark/pull/3619 and then take
the average distance (apsp.map(_._2.toDouble).mean).

Ankur http://www.ankurdave.com/

On Sun, Jan 4, 2015 at 6:28 PM, James alcaid1...@gmail.com wrote:

 Recently we want to use spark to calculate the average shortest path
 distance between each reachable pair of nodes in a very big graph.

 Is there any one ever try this? We hope to discuss about the problem.



Re: representing RDF literals as vertex properties

2014-12-08 Thread Ankur Dave
At 2014-12-08 12:12:16 -0800, spr s...@yarcdata.com wrote:
 OK, have waded into implementing this and have gotten pretty far, but am now
 hitting something I don't understand, an NoSuchMethodError. 
 [...]
 The (short) traceback looks like

 Exception in thread main java.lang.NoSuchMethodError:
 org.apache.spark.graphx.Graph$.apply$default$4()Lorg/apache/spark/storage/StorageLevel;
 [...]
 Is the method that's not found (.../StorageLevel) something I need to
 initialize?  Using this same code on a toy problem works fine.  

This is a binary compatibility error and shouldn't happen as long as you're 
compiling and running with the same Spark assembly jar. Is it possible there's 
a version mismatch between compiling and running?

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Graphx] which way is better to access faraway neighbors?

2014-12-05 Thread Ankur Dave
At 2014-12-05 02:26:52 -0800, Yifan LI iamyifa...@gmail.com wrote:
 I have a graph in where each vertex keep several messages to some faraway 
 neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. 
 k = 5).

 now, I propose to distribute these messages to their corresponding 
 destinations(say, faraway neighbours”):

 - by using pregel api, one superset is enough 
 - by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD 
 and its intermediate results.

 w.r.t the communication among machines, and the high cost of 
 groupByKey/leftJoin, I guess that 1st option is better?

If messages will only travel along edges (even if they travel over multiple 
edges), then the Pregel API should be faster. You'll have to run k supersteps 
for messages to propagate k hops away from their origins.

If messages can jump directly between two arbitrary vertices, then doing a 
single set of Spark basic operations may be faster than running multiple Pregel 
supersteps.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Determination of number of RDDs

2014-12-04 Thread Ankur Dave
At 2014-12-04 02:08:45 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I have a graph and I want to create RDDs equal in number to the nodes in
 the graph. How can I do that?
 If I have 10 nodes then I want to create 10 rdds. Is that possible in
 GraphX?

This is possible: you can collect the elements to the driver, then create an 
RDD for each element.

If you have so many elements that collect them to the driver is infeasible, 
there's probably an alternative solution that doesn't involve creating one RDD 
per element.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX Pregel halting condition

2014-12-04 Thread Ankur Dave
There's no built-in support for doing this, so the best option is to copy and 
modify Pregel to check the accumulator at the end of each iteration. This is 
robust and shouldn't be too hard, since the Pregel code is short and only uses 
public GraphX APIs.

Ankur

At 2014-12-03 09:37:01 -0800, Jay Hutfles jayhutf...@gmail.com wrote:
 I'm trying to implement a graph algorithm that does a form of path
 searching.  Once a certain criteria is met on any path in the graph, I
 wanted to halt the rest of the iterations.  But I can't see how to do that
 with the Pregel API, since any vertex isn't able to know the state of other
 arbitrary vertex (if they're not adjacent).

 Is there a common pattern for doing something like this?  I was thinking of
 using a custom accumulator where the zero is true and the addInPlace is a
 boolean or.  Each vertex (as part of its vprog) could add to the
 accumulator, and once a path is found which meets the condition, the
 accumulator would then have a value of false.  But since workers can't read
 accumulators, I don't see how to use that when knowing whether to iterate
 again.  That is, unless I reimplement the Pregel class with the added check
 when iterating...

 Any suggestions?  Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: representing RDF literals as vertex properties

2014-12-04 Thread Ankur Dave
At 2014-12-04 16:26:50 -0800, spr s...@yarcdata.com wrote:
 I'm also looking at how to represent literals as vertex properties. It seems
 one way to do this is via positional convention in an Array/Tuple/List that is
 the VD; i.e., to represent height, weight, and eyeColor, the VD could be a
 Tuple3(Double, Double, String).
 [...]
 Given that vertices can have many many properties, it seems memory consumption
 for the properties should be as parsimonious as possible. Will any of
 Array/Tuple/List support sparse usage? Is Option the way to get there?

Storing vertex properties positionally with Array[Option[Any]] or any of the 
other sequence types will provide a dense representation. For a sparse 
representation, the right data type is a Map[String, Any], which will let you 
access properties by name and will only store the nonempty properties.

Since the value type in the map has to be Any, or more precisely the least 
upper bound of the property types, this sacrifices type safety and you'll have 
to downcast when retrieving properties. If there are particular subsets of the 
properties that frequently go together, you could instead use a class 
hierarchy. For example, if the vertices are either people or products, you 
could use the following:

sealed trait VertexProperty extends Serializable
case class Person(name: String, weight: Int) extends VertexProperty
case class Product(name: String, price: Int) extends VertexProperty

Then you could pattern match against the hierarchy instead of downcasting:

 List(Person(Bob, 180), Product(chair, 800), Product(desk, 
200)).flatMap {
   case Person(name, weight) = Array.empty[Int]
   case Product(name, price) = Array(price)
 }.sum

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter using the Vertex Ids

2014-12-03 Thread Ankur Dave
At 2014-12-03 02:13:49 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote:
 We cannot do sc.parallelize(List(VertexRDD)), can we?

There's no need to do this, because every VertexRDD is also a pair RDD:

class VertexRDD[VD] extends RDD[(VertexId, VD)]

You can simply use graph.vertices in place of `a` in my example.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter using the Vertex Ids

2014-12-03 Thread Ankur Dave
At 2014-12-02 22:01:20 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I have a graph which returns the following on doing graph.vertices
 (1, 1.0)
 (2, 1.0)
 (3, 2.0)
 (4, 2.0)
 (5, 0.0)

 I want to group all the vertices with the same attribute together, like into
 one RDD or something. I want all the vertices with same attribute to be
 together.

You can do this by flipping the tuples so the values become the keys, then 
using one of the by-key functions in PairRDDFunctions:

val a: RDD[(Int, Double)] = sc.parallelize(List(
  (1, 1.0),
  (2, 1.0),
  (3, 2.0),
  (4, 2.0),
  (5, 0.0)))

val b: RDD[(Double, Int)] = a.map(kv = (kv._2, kv._1))

val c: RDD[(Double, Iterable[Int])] = b.groupByKey(numPartitions = 5)

c.collect.foreach(println)
// (0.0,CompactBuffer(5))
// (1.0,CompactBuffer(1, 2))
// (2.0,CompactBuffer(3, 4))

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter using the Vertex Ids

2014-12-03 Thread Ankur Dave
To get that function in scope you have to import
org.apache.spark.SparkContext._

Ankur

On Wednesday, December 3, 2014, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 But groupByKey() gives me the error saying that it is not a member of
 org.apache.spark,rdd,RDD[(Double, org.apache.spark.graphx.VertexId)]



-- 
Ankur http://www.ankurdave.com/


Re: how to force graphx to execute transfomtation

2014-11-26 Thread Ankur Dave
At 2014-11-26 05:25:10 -0800, Hlib Mykhailenko hlib.mykhaile...@inria.fr 
wrote:
 I work with Graphx. When I call graph.partitionBy(..) nothing happens, 
 because, as I understood, that all transformation are lazy and partitionBy is 
 built using transformations. 
 Is there way how to force spark to actually execute this transformation and 
 not use any action? 

If you just want the transformation to run without returning anything, such as 
for benchmarking, you can use graph.partitionBy(...).foreachPartition(x = {}).

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: inconsistent edge counts in GraphX

2014-11-18 Thread Ankur Dave
At 2014-11-11 01:51:43 +, Buttler, David buttl...@llnl.gov wrote:
 I am building a graph from a large CSV file.  Each record contains a couple 
 of nodes and about 10 edges.  When I try to load a large portion of the 
 graph, using multiple partitions, I get inconsistent results in the number of 
 edges between different runs.  However, if I use a single partition, or a 
 small portion of the CSV file (say 1000 rows), then I get a consistent number 
 of edges.  Is there anything I should be aware of as to why this could be 
 happening in GraphX?

Is it possible there's some nondeterminism in the way you're reading the file? 
It would be helpful if you could post the code you're using to load the graph.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX / PageRank with edge weights

2014-11-18 Thread Ankur Dave
At 2014-11-13 21:28:52 +, Ommen, Jurgen omme0...@stthomas.edu wrote:
 I'm using GraphX and playing around with its PageRank algorithm. However, I 
 can't see from the documentation how to use edge weight when running PageRank.
 Is this possible to consider edge weights and how would I do it?

There's no built-in way to prefer certain edges over others; edge weights are 
just set to the inverse of the outdegree of the source vertex. But it's simple 
to modify the PageRank code [1] to use custom edge weights instead: (1) copy 
the PageRank.run method body to your own project, (2) change the type signature 
of the input graph from Graph[VD, ED] to Graph[VD, Double], and (3) remove the 
calls to outerJoinVertices and mapTriplets on line 86 and 88.

Ankur

[1] 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala#L79

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pagerank implementation

2014-11-18 Thread Ankur Dave
At 2014-11-15 18:01:22 -0700, tom85 tom.manha...@gmail.com wrote:
 This line: val newPR = oldPR + (1.0 - resetProb) * msgSum
 makes no sense to me. Should it not be:
 val newPR = resetProb/graph.vertices.count() + (1.0 - resetProb) * msgSum 
 ?

This is an unusual version of PageRank where the messages being passed around 
are deltas rather than full ranks. This occurs at line 156 in vertexProgram, 
which returns (newPR, newPR - oldPR). The second element of the tuple is the 
delta, which is subsequently used in sendMessage.

The benefit of this is that sendMessage can avoid sending when the delta drops 
below the convergence threshold `tol`, indicating that the source vertex has 
converged. But it means that to update the rank of each vertex, we have to add 
the incoming delta to its existing rank. That's why the oldPR term appears in 
the line you're looking at.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Landmarks in GraphX section of Spark API

2014-11-18 Thread Ankur Dave
At 2014-11-17 14:47:50 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I was going through the graphx section in the Spark API in
 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ShortestPaths$

 Here, I find the word landmark. Can anyone explain to me what is landmark
 means. Is it a simple English word or does it mean something else in graphx.

The landmarks in the context of the shortest-paths algorithm are just the 
vertices of interest. For each vertex in the graph, the algorithm will return 
the distance to each of the landmark vertices.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running PageRank in GraphX

2014-11-18 Thread Ankur Dave
At 2014-11-18 12:02:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I just ran the PageRank code in GraphX with some sample data. What I am
 seeing is that the total rank changes drastically if I change the number of
 iterations from 10 to 100. Why is that so?

As far as I understand, the total rank should asymptotically approach the 
number of vertices in the graph, assuming there are no vertices of zero 
outdegree. Does that seem to be the case for your graph?

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Landmarks in GraphX section of Spark API

2014-11-18 Thread Ankur Dave
At 2014-11-18 14:59:20 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 So landmark can contain just one vertex right?

Right.

 Which algorithm has been used to compute the shortest path?

It's distributed Bellman-Ford.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New Codes in GraphX

2014-11-18 Thread Ankur Dave
At 2014-11-18 14:51:54 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I am using Spark-1.0.0. There are two GraphX directories that I can see here

 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx
 which contains LiveJournalPageRank,scala

 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which
 contains   Analytics.scala, ConnectedComponenets.scala etc etc

 Now, if I want to add my own code to GraphX i.e., if I want to write a
 small application on GraphX, in which directory should I add my code, in 1
 or 2 ? And what is the difference?

If you want to add an algorithm which you can call from the Spark shell and 
submit as a pull request, you should add it to org.apache.spark.graphx.lib 
(#2). To run it from the command line, you'll also have to modify 
Analytics.scala.

If you want to write a separate application, the ideal way is to do it in a 
separate project that links in Spark as a dependency [1]. It will also work to 
put it in either #1 or #2, but this will be worse in the long term because each 
build cycle will require you to rebuild and restart all of Spark rather than 
just building your application and calling spark-submit on the new JAR.

Ankur

[1] http://spark.apache.org/docs/1.0.2/quick-start.html#standalone-applications

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Landmarks in GraphX section of Spark API

2014-11-18 Thread Ankur Dave
At 2014-11-18 15:29:08 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 Does Bellman-Ford give the best solution?

It gives the same solution as any other algorithm, since there's only one 
correct solution for shortest paths and it's guaranteed to find it eventually. 
There are probably faster distributed algorithms for single-source shortest 
paths, but I'm not familiar with them. As far as I can tell, distributed 
Bellman-Ford is the most widely-used one.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New Codes in GraphX

2014-11-18 Thread Ankur Dave
At 2014-11-18 15:35:13 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 Now, how do I run the LiveJournalPageRank.scala that is there in 1?

I think it should work to use

MASTER=local[*] $SPARK_HOME/bin/run-example graphx.LiveJournalPageRank 
/edge-list-file.txt --numEPart=8 --numIter=10
 --partStrategy=EdgePartition2D

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Landmarks in GraphX section of Spark API

2014-11-18 Thread Ankur Dave
At 2014-11-18 15:44:31 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 I meant to ask whether it gives the solution faster than other algorithms.

No, it's just that it's much simpler and easier to implement than the others. 
Section 5.2 of the Pregel paper [1] justifies using it for a graph (a binary 
tree) with 1 billion vertices on 300 machines:

More advanced parallel algorithms exist, e.g., Thorup [44] or the ∆-stepping
method [37], and have been used as the basis for special-purpose parallel
shortest paths implementations [12, 32]. Such advanced algorithms can also
be expressed in the Pregel framework. The simplicity of the implementation
in Figure 5, however, together with the already acceptable performance (see
Section 6), may appeal to users who can't do extensive tuning or
customization.

 What do you mean by distributed algorithms? Can we not use any algorithm on
 a distributed environment?

Any algorithm can be split up and run in a distributed environment, but because 
inter-node coordination is expensive, that can be very inefficient. Distributed 
algorithms in this context are ones that reduce coordination.

Ankur

[1] http://db.cs.berkeley.edu/cs286/papers/pregel-sigmod2010.pdf

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New Codes in GraphX

2014-11-18 Thread Ankur Dave
At 2014-11-18 15:51:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 Yes the above command works, but there is this problem. Most of the times,
 the total rank is Nan (Not a Number). Why is it so?

I've also seen this, but I'm not sure why it happens. If you could find out 
which vertices are getting the NaN rank, it might be helpful in tracking down 
the problem.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fwd: Executor Lost Failure

2014-11-10 Thread Ankur Dave
At 2014-11-10 22:53:49 +0530, Ritesh Kumar Singh 
riteshoneinamill...@gmail.com wrote:
 Tasks are now getting submitted, but many tasks don't happen.
 Like, after opening the spark-shell, I load a text file from disk and try
 printing its contentsas:

sc.textFile(/path/to/file).foreach(println)

 It does not give me any output.

That's because foreach launches tasks on the slaves. When each task tries to 
print its lines, they go to the stdout file on the slave rather than to your 
console at the driver. You should see the file's contents in each of the 
slaves' stdout files in the web UI.

This only happens when running on a cluster. In local mode, all the tasks are 
running locally and can output to the driver, so foreach(println) is more 
useful.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: To find distances to reachable source vertices using GraphX

2014-11-03 Thread Ankur Dave
The NullPointerException seems to be because edge.dstAttr is null, which
might be due to SPARK-3936
https://issues.apache.org/jira/browse/SPARK-3936. Until that's fixed, I
edited the Gist with a workaround. Does that fix the problem?

Ankur http://www.ankurdave.com/

On Mon, Nov 3, 2014 at 12:23 AM, Madabhattula Rajesh Kumar 
mrajaf...@gmail.com wrote:

 Hi All,

 I'm trying to understand below link example program. When I run this
 program, I'm getting *java.lang.NullPointerException* at below
 highlighted line.

 *https://gist.github.com/ankurdave/4a17596669b36be06100
 https://gist.github.com/ankurdave/4a17596669b36be06100*

 val updatedDists = edge.srcAttr.filter {
 case (source, dist) =
 *val existingDist = edge.dstAttr.getOrElse(source, Int.MaxValue) *
 existingDist  dist + 1
 }.mapValues(_ + 1).map(identity)

 Could you please help me to resolve this issue.

 Regards,
 Rajesh



Re: How to correctly extimate the number of partition of a graph in GraphX

2014-11-02 Thread Ankur Dave
How large is your graph, and how much memory does your cluster have?

We don't have a good way to determine the *optimal* number of partitions
aside from trial and error, but to get the job to at least run to
completion, it might help to use the MEMORY_AND_DISK storage level and a
large number of partitions.

Ankur http://www.ankurdave.com/

On Sat, Nov 1, 2014 at 10:57 PM, James alcaid1...@gmail.com wrote:

 Hello,

 I am trying to run Connected Component algorithm on a very big graph. In
 practice I found that a small number of partition size would lead to OOM,
 while a large number would cause various time out exceptions. Thus I wonder
 how to estimate the number of partition of a graph in GraphX?

 Alcaid



Re: How to set persistence level of graph in GraphX in spark 1.0.0

2014-10-28 Thread Ankur Dave
At 2014-10-25 08:56:34 +0530, Arpit Kumar arp8...@gmail.com wrote:
 GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl
 cannot be accessed in package org.apache.spark.graphx.impl
 [INFO]   val builder = new EdgePartitionBuilder[Int, Int]

Here's a workaround:

1. Copy and modify the GraphLoader source as you did, but keep it in the 
org.apache.spark.graphx.impl package to fix the package-private error.
2. In addition to changing the persistence level of the edges RDD in 
GraphLoader, construct the VertexRDD and EdgeRDD yourself.
3. Call GraphImpl.fromExistingRDDs to construct the graph. This function will 
respect the existing EdgeRDD storage level.
4. Use the graph as desired. Be sure to avoid Graph#partitionBy, the Pregel 
API, and all of the built-in algorithms, because they call Graph#cache() on 
intermediate graphs.

Here is a modified version of GraphLoader that does 1-3: 
https://gist.github.com/ankurdave/0394d47809297eea76ff

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX StackOverflowError

2014-10-28 Thread Ankur Dave
At 2014-10-28 16:27:20 +0300, Zuhair Khayyat zuhair.khay...@gmail.com wrote:
 I am using connected components function of GraphX (on Spark 1.0.2) on some
 graph. However for some reason the fails with StackOverflowError. The graph
 is not too big; it contains 1 vertices and 50 edges.

 [...]
 14/10/28 16:08:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage
 13 (VertexRDD.createRoutingTables - vid2pid (aggregation)
 MapPartitionsRDD[13] at mapPartitions at VertexRDD.scala:423)

This seems like a bug in Scala's implementation of quicksort, maybe SI-7837 
[1]. GraphX uses Scala's quicksort to sort the edges when loading them into 
memory.

If you're able to modify Spark, you could avoid using quicksort by changing 
EdgePartitionBuilder.scala:40 from

Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)

to

implicit val ordering: Ordering[Edge[ED]] = Edge.lexicographicOrdering
Sorting.stableSort(edgeArray)

Otherwise, the workaround will depend on the cause of the bug. If it's because 
you have thousands of duplicates of the same edge which are triggering SI-7837, 
you could use RDD#distinct to remove them before constructing the graph. If 
it's because of an unlikely worst-case data distribution that's causing 
quicksort to run in linear time, you could reorder the edges using RDD#sortBy.

Ankur

[1] https://issues.scala-lang.org/browse/SI-7837

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Workaround for SPARK-1931 not compiling

2014-10-24 Thread Ankur Dave
At 2014-10-23 09:48:55 +0530, Arpit Kumar arp8...@gmail.com wrote:
 error: value partitionBy is not a member of
 org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID,
 org.apache.spark.graphx.Edge[ED])]

Since partitionBy is a member of PairRDDFunctions, it sounds like the implicit 
conversion from RDD to PairRDDFunctions is not getting applied. Does it help to 
import org.apache.spark.SparkContext._ before applying the workaround?

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: graphx - mutable?

2014-10-14 Thread Ankur Dave
On Tue, Oct 14, 2014 at 12:36 PM, ll duy.huynh@gmail.com wrote:

 hi again.  just want to check in again to see if anyone could advise on how
 to implement a mutable, growing graph with graphx?

 we're building a graph is growing over time.  it adds more vertices and
 edges every iteration of our algorithm.

 it doesn't look like there is an obvious way to add a new vertice  a set
 of
 edges to an existing graph.


Currently the only way to do this is to rebuild the graph in each iteration
by adding more vertices and edges to the source RDDs, then calling the
graph constructor.

I'm working on a way to support this more efficiently (SPARK-2365
https://issues.apache.org/jira/browse/SPARK-2365), but GraphX doesn't
take advantage of this yet.

Ankur http://www.ankurdave.com/


Re: graphx - mutable?

2014-10-14 Thread Ankur Dave
On Tue, Oct 14, 2014 at 1:57 PM, Duy Huynh duy.huynh@gmail.com wrote:

 a related question, what is the best way to update the values of existing
 vertices and edges?


Many of the Graph methods deal with updating the existing values in bulk,
including mapVertices, mapEdges, mapTriplets, mapReduceTriplets, and
outerJoinVertices.

To update just a small number of existing values, IndexedRDD would be
ideal, but until it makes it into GraphX the best way is to use one of the
above methods. This will be slower since it's touching all of the vertices,
but it will achieve the same goal.

For example, if you had a graph and wanted to update the value of vertex 1
to a, you could do the following:

val graph = ...
val updates = sc.parallelize(List((1L, a)))
val newGraph = graph.outerJoinVertices(updates) { (id, a, b) = b }

Ankur http://www.ankurdave.com/


Re: How to construct graph in graphx

2014-10-13 Thread Ankur Dave
At 2014-10-13 18:22:44 -0400, Soumitra Siddharth Johri 
soumitra.siddha...@gmail.com wrote:
 I have a flat tab separated file like below:

 [...]

 where n1,n2,n3,n4 are the nodes of the graph and R1,P2,P3 are the
 properties which should form the edges between the nodes.

 How can I construct a graph from the above file in SPARK GraphX ? Example
 code would be very helpful.

Here's how to do this: https://gist.github.com/ankurdave/587eac4d08655d0eebf9

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to construct graph in graphx

2014-10-13 Thread Ankur Dave
At 2014-10-13 21:08:15 -0400, Soumitra Johri soumitra.siddha...@gmail.com 
wrote:
 There is no 'long' field in my file. So when I form the edge I get a type
 mismatch error. Is it mandatory for GraphX that every vertex should have a
 distinct id. ?

 in my case n1,n2,n3,n4 are all strings.

(+user so others can see the solution)

Yes, GraphX currently only supports integer vertex ids. If your data is stored 
with string ids, a possible workaround is to hash them to integers using 
String.hashCode. If you need access to the string representation later, you can 
store it in the vertex attribute.

I added a file to the Gist that does this: 
https://gist.github.com/ankurdave/587eac4d08655d0eebf9#file-load-edges-with-properties-and-string-id-scala

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pregel messages serialized in local machine?

2014-09-25 Thread Ankur Dave
At 2014-09-25 06:52:46 -0700, Cheuk Lam chl...@hotmail.com wrote:
 This is a question on using the Pregel function in GraphX.  Does a message
 get serialized and then de-serialized in the scenario where both the source
 and the destination vertices are in the same compute node/machine?

Yes, message passing currently uses partitionBy, which shuffles all messages, 
including ones to the same machine, by serializing them and writing them to 
disk.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to group within the messages at a vertex?

2014-09-17 Thread Ankur Dave
At 2014-09-17 11:39:19 -0700, spr s...@yarcdata.com wrote:
 I'm trying to implement label propagation in GraphX.  The core step of that
 algorithm is

 - for each vertex, find the most frequent label among its neighbors and set
 its label to that.

 [...]

 It seems on the broken line above, I don't want to reduce all the values
 to a scalar, as this code does, but rather group them first and then reduce
 them.  Can I do that all within mapReduceTriples?  If not, how do I build
 something that I can then further reduce?

Label propagation is actually already implemented in GraphX [1]. The way it 
handles the most frequent label reduce operation is to aggregate a histogram, 
implemented as a map from label to frequency, and then take the most frequent 
element from the map at the end. Something to watch out for is that this can 
create large aggregation messages for high-degree vertices.

Ankur

[1] 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: vertex active/inactive feature in Pregel API ?

2014-09-16 Thread Ankur Dave
At 2014-09-16 10:55:37 +0200, Yifan LI iamyifa...@gmail.com wrote:
 - from [1], and my understanding, the existing inactive feature in graphx 
 pregel api is “if there is no in-edges, from active vertex, to this vertex, 
 then we will say this one is inactive”, right?

Well, that's true when messages are only sent forward along edges (from the 
source to the destination) and the activeDirection is EdgeDirection.Out. If 
both of these conditions are true, then a vertex without in-edges cannot 
receive a message, and therefore its vertex program will never run and a 
message will never be sent along its out-edges. PageRank is an application that 
satisfies both the conditions.

 For instance, there is a graph in which every vertex has at least one 
 in-edges, then we run static Pagerank on it for 10 iterations. During this 
 calculation, is there any vertex would be set inactive?

No: since every vertex always sends a message in static PageRank, if every 
vertex has an in-edge, it will always receive a message and will remain active.

In fact, this is why I recently rewrote static PageRank not to use Pregel [3]. 
Assuming that most vertices do have in-edges, it's unnecessary to track active 
vertices, which can provide a big savings.

 - for more “explicit active vertex tracking”, e.g. vote to halt, how to 
 achieve it in existing api?
 (I am not sure I got the point of [2], that “vote” function has already been 
 introduced in graphx pregel api? )

The current Pregel API effectively makes every vertex vote to halt in every 
superstep. Therefore only vertices that receive messages get awoken in the next 
superstep.

Instead, [2] proposes to make every vertex run by default in every superstep 
unless it votes to halt *and* receives no messages. This allows a vertex to 
have more control over whether or not it will run, rather than leaving that 
entirely up to its neighbors.

Ankur

 [1] 
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$
 [2] https://github.com/apache/spark/pull/1217
[3] https://github.com/apache/spark/pull/2308

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: vertex active/inactive feature in Pregel API ?

2014-09-16 Thread Ankur Dave
At 2014-09-16 12:23:10 +0200, Yifan LI iamyifa...@gmail.com wrote:
 but I am wondering if there is a message(none?) sent to the target vertex(the 
 rank change is less than tolerance) in below dynamic page rank implementation,

  def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
   if (edge.srcAttr._2  tol) {
 Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
   } else {
 Iterator.empty
   }
 }

 so, in this case, there is a message, even is none, is still sent? or not?

No, in that case no message is sent, and if all in-edges of a particular vertex 
return Iterator.empty, then the vertex will become inactive in the next 
iteration.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: vertex active/inactive feature in Pregel API ?

2014-09-15 Thread Ankur Dave
At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote:
 I am wondering if the vertex active/inactive(corresponding the change of its 
 value between two supersteps) feature is introduced in Pregel API of GraphX?

Vertex activeness in Pregel is controlled by messages: if a vertex did not 
receive a message in the previous iteration, its vertex program will not run in 
the current iteration. Also, inactive vertices will not be able to send 
messages because by default the sendMsg function will only be run on edges 
where at least one of the adjacent vertices received a message. You can change 
this behavior -- see the documentation for the activeDirection parameter to 
Pregel.apply [1].

There is also an open pull request to make active vertex tracking more explicit 
by allowing vertices to vote to halt directly [2].

Ankur

[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$
[2] https://github.com/apache/spark/pull/1217

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-09-09 Thread Ankur Dave
At 2014-09-05 12:13:18 +0200, Yifan LI iamyifa...@gmail.com wrote:
 But how to assign the storage level to a new vertices RDD that mapped from
 an existing vertices RDD,
 e.g.
 *val newVertexRDD =
 graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId,
 a:Array[VertexId]) = (id, initialHashMap(a))}*

 the new one will be combined with that existing edges RDD(MEMORY_AND_DISK)
 to construct a new graph.
 e.g.
 val newGraph = Graph(newVertexRDD, graph.edges)

Sorry for the late reply. If you are constructing a graph from the derived 
VertexRDD, you can pass a desired storage level to the Graph constructor:

val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map {
  case (id: VertexId, a: Array[VertexId]) = (id, initialHashMap(a))
}
val newGraph = Graph(
  newVertexRDD,
  graph.edges,
  edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)

For others reading, the reason why GraphX needs to be told the desired storage 
level is that it internally constructs temporary vertex or edge RDDs and uses 
them more than once, so it has to cache them to avoid recomputation.

 BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true,
 false, true, 1), what does it mean?

See the StorageLevel object [1]. This particular storage level corresponds to 
StorageLevel.MEMORY_AND_DISK.

Ankur

[1] 
https://github.com/apache/spark/blob/092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala#L147

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.1.0 requested array size exceed vm limits

2014-09-05 Thread Ankur Dave
At 2014-09-05 21:40:51 +0800, marylucy qaz163wsx_...@hotmail.com wrote:
 But running graphx edgeFileList ,some tasks failed
 error:requested array size exceed vm limits 

Try passing a higher value for minEdgePartitions when calling 
GraphLoader.edgeListFile.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-09-03 Thread Ankur Dave
At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote:
 val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = 
 numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)

 Error: java.lang.UnsupportedOperationException: Cannot change storage level
 of an RDD after it was already assigned a level

You have to pass the StorageLevel to GraphLoader.edgeListFile:

val graph = GraphLoader.edgeListFile(
  sc, edgesFile, minEdgePartitions = numPartitions,
  edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)
  .partitionBy(PartitionStrategy.EdgePartition2D)

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)

2014-08-26 Thread Ankur Dave
At 2014-08-26 01:20:09 -0700, BertrandR bertrand.rondepierre...@gmail.com 
wrote:
 I actually tried without unpersisting, but given the performance I tryed to
 add these in order to free the memory. After your anwser I tried to remove
 them again, but without any change in the execution time...

This is probably a related issue: in Spark you have to explicitly cache any 
dataset that you use more than once. Otherwise it will be recomputed each time 
it's used, which can cause an exponential slowdown for certain dependency 
structures.

To be safe, you could start by caching g, msg, and newVerts every time they are 
set.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)

2014-08-25 Thread Ankur Dave
At 2014-08-25 06:41:36 -0700, BertrandR bertrand.rondepierre...@gmail.com 
wrote:
 Unfortunately, this works well for extremely small graphs, but it becomes
 exponentially slow with the size of the graph and the number of iterations
 (doesn't finish 20 iterations with graphs having 48000 edges).
 [...]
 It seems to me that a lot of things are unnecessarily recomputed at each
 iterations whatever I try to do. I also did multiple changes to limit the
 number of dependency of each object, but it didn't change anything.
 [...]
   fusionBcst.unpersist(blocking = false)

The problem is almost certainly because of unpersisting. If you comment out all 
the unpersist lines, the program should run normally.

Unpersisting is very tricky because of the internal dependency structure of 
graphs: they maintain a vertex and an edge RDD, and each depends on both from 
the previous iteration.

A future update to GraphX will unify them so that a graph only has one RDD, and 
this will make it easier to unpersist correctly. Until then, unpersisting may 
not be worth the trouble.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX usecases

2014-08-25 Thread Ankur Dave
At 2014-08-25 11:23:37 -0700, Sunita Arvind sunitarv...@gmail.com wrote:
 Does this We introduce GraphX, which combines the advantages of both
 data-parallel and graph-parallel systems by efficiently expressing graph
 computation within the Spark data-parallel framework. We leverage new ideas
 in distributed graph representation to efficiently distribute graphs as
 tabular data-structures. Similarly, we leverage advances in data-flow
 systems to exploit in-memory computation and fault-tolerance. mean that
 GraphX makes the typical RDBMS operations possible even when the data is
 persisted in a GDBMS and not viceversa?

This quote refers to the research idea that while previous graph-parallel 
systems (Pregel, GraphLab, etc.) were built as specialized systems for 
performance, it's actually possible to avoid the trouble of a separate system 
by embedding graph computation efficiently in a general data-parallel system. 
Here data-parallel refers generally to any system that can support the join 
optimizations, including Spark and, with some work on the optimizer, relational 
databases as well. So GraphX use data-parallel or relational operators to 
provide graph computation, not the other way around.

 From what I initially thought, it looked like GraphX could be applied to data
 stored in RDBMSs as Spark could translate the relational data into graphical
 representation. However, there seems to be no conversation and everything
 presented in GraphX implementations AFAIK, works on vertices and edges. So
 does it mean that GraphX is only relevant when the backend is a GDBMS?

GraphX, the library on top of Spark, can be applied indirectly to relational 
data as you described: you can use Spark to load vertex and edge tables from a 
relational database, then process them with GraphX. This isn't discussed in the 
GraphX documentation because it's a concern of Spark. GraphX is only relevant 
once you have the vertices and edges in RDD form.

GraphX, the research concept, can in theory be implemented directly in a 
relational database by augmenting the query optimizer to support the 
optimizations described in the paper and setting up the appropriate indexes on 
the vertex and edge tables.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The running time of spark

2014-08-23 Thread Ankur Dave
At 2014-08-23 08:33:48 -0700, Denis RP qq378789...@gmail.com wrote:
 Bottleneck seems to be I/O, the CPU usage ranges 10%~15% most time per VM.
 The caching is maintained by pregel, should be reliable. Storage level is
 MEMORY_AND_DISK_SER.

I'd suggest trying the DISK_ONLY storage level and possibly increasing the 
number of partitions. I did a local test with a 2G heap, 1M vertices, 126M 
edges, and 100 edge partitions, and MEMORY_AND_DISK_SER failed with 
OutOfMemoryErrors while DISK_ONLY succeeded.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Personalized Page rank in graphx

2014-08-20 Thread Ankur Dave
At 2014-08-20 10:57:57 -0700, Mohit Singh mohit1...@gmail.com wrote:
 I was wondering if Personalized Page Rank algorithm is implemented in graphx. 
 If the talks and presentation were to be believed 
 (https://amplab.cs.berkeley.edu/wp-content/uploads/2014/02/graphx@strata2014_final.pdf)
  it is.. but cant find the algo code 
 (https://github.com/amplab/graphx/tree/master/graphx/src/main/scala/org/apache/spark/graphx/lib)?

Those slides only mean Personalized PageRank can be expressed within the 
neighborhood-centric model. I don't think anyone has implemented it for GraphX 
yet. If you're interested in doing that, we'd be glad to add it to the library!

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX question about graph traversal

2014-08-20 Thread Ankur Dave
At 2014-08-20 10:34:50 -0700, Cesar Arevalo ce...@zephyrhealthinc.com wrote:
 I would like to get the type B vertices that are connected through type A
 vertices where the edges have a score greater than 5. So, from the example
 above I would like to get V1 and V4.

It sounds like you're trying to find paths in the graph that match a particular 
pattern. I wrote a prototype for doing that using the Pregel API [1, 2] in 
response to an earlier question on the mailing list [3]. That code won't solve 
your problem immediately, since it requires exact vertex and edge attribute 
matches rather than predicates like greater than 5, but it should be easy to 
modify it appropriately.

Ankur

[1] 
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala
[2] 
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala
[3] 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-traversal-and-merge-interesting-edges-td8788.html

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: noob: how to extract different members of a VertexRDD

2014-08-19 Thread Ankur Dave
(+user)

On Tue, Aug 19, 2014 at 12:05 PM, spr s...@yarcdata.com wrote:

 I want to assign each vertex to a community with the name of the vertex.


As I understand it, you want to set the vertex attributes of a graph to the
corresponding vertex ids. You can do this using Graph#mapVertices [1] as
follows:

val g = ...
val newG = g.mapVertices((id, attr) = id)
// newG.vertices has type VertexRDD[VertexId], or RDD[(VertexId,
VertexId)]

If you only want to do this for a VertexRDD without constructing a new
graph using the modified vertices, you can use VertexRDD#mapVertices [2] in
a similar fashion.

[1]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)
⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
[2]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.VertexRDD@mapValues[VD2]((VertexId,VD)
⇒VD2)(ClassTag[VD2]):VertexRDD[VD2]

Ankur http://www.ankurdave.com/


Re: noob: how to extract different members of a VertexRDD

2014-08-19 Thread Ankur Dave
At 2014-08-19 12:47:16 -0700, spr s...@yarcdata.com wrote:
 One follow-up question.  If I just wanted to get those values into a vanilla
 variable (not a VertexRDD or Graph or ...) so I could easily look at them in
 the REPL, what would I do?  Are the aggregate data structures inside the
 VertexRDD/Graph/... Arrays or Lists or what, or do I even need to know/care?  

The vertex values are internally stored in hash tables within each partition 
(see VertexPartitionBase if you're curious) but to access them all from the 
REPL, you can just use RDD#collect as in your first mail. If you want just the 
vertex ids, you can use RDD#map first:

val verts: VertexRDD[Int] = ...
val pairs: Array[(VertexId, Int)] = verts.collect()
val ids: Array[VertexId] = verts.map(kv = kv._1).collect()

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-08-18 Thread Ankur Dave
On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote:

 I am testing our application(similar to personalised page rank using
 Pregel, and note that each vertex property will need pretty much more space
 to store after new iteration)

[...]

But when we ran it on larger graph(e.g. LiveJouranl), it always end at the
 error GC overhead limit exceeded, even the partitions number is increased
 to 48 from 8.


If the graph (including vertex properties) is too large to fit in memory,
you might try allowing it to spill to disk. When constructing the graph,
you can set vertexStorageLevel and edgeStorageLevel to
StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish.

Ankur http://www.ankurdave.com/


Re: GraphX Pagerank application

2014-08-15 Thread Ankur Dave
On Wed, Aug 6, 2014 at 11:37 AM, AlexanderRiggers 
alexander.rigg...@gmail.com wrote:

 To perform the page rank I have to create a graph object, adding the edges
 by setting sourceID=id and distID=brand. In GraphLab there is function: g =
 SGraph().add_edges(data, src_field='id', dst_field='brand')

 Is there something similar in GraphX?


It sounds like you're trying to parse an edge list file into a graph, where
each line is a comma-separated pair of numeric vertex ids. There's a
built-in parser for tab-separated pairs (see GraphLoader) and it should be
easy to adapt that to comma-separated pairs. You can also drop the header
line using RDD#filter (and eventually using
https://github.com/apache/spark/pull/1839).

Ankur http://www.ankurdave.com/


Re: [GraphX] How spark parameters relate to Pregel implementation

2014-08-04 Thread Ankur Dave
At 2014-08-04 20:52:26 +0800, Bin wubin_phi...@126.com wrote:
 I wonder how spark parameters, e.g., number of paralellism, affect Pregel 
 performance? Specifically, sendmessage, mergemessage, and vertexprogram?

 I have tried label propagation on a 300,000 edges graph, and I found that no 
 paralellism is much faster than 5 or 500 paralellism.

Increasing the level of parallelism will increase storage overhead (because 
each vertex will need to be replicated to more edge partitions to form the 
triplets) and will also increase communication. Unless there's something to be 
gained from higher parallelism, this will worsen performance. Additionally, 
going from no parallelism to some parallelism will incur the extra cost of task 
communication via shuffles.

Parallelism has two benefits: it allows edge scans and aggregations to proceed 
in parallel, and it enables the graph to be stored across many machines.

For small graphs, the slight performance gain due to parallelism is vastly 
outweighed by the cost of inter-process communication and shuffling to disk, 
and distributed storage is not necessary since the graph fits on a single 
machine. There are single-machine graph processing systems such as X-Stream [1] 
and GraphChi [2] that optimize performance for these kinds of graphs.

However, parallelism becomes necessary for larger graphs with hundreds of 
millions of edges or large amounts of associated vertex and edge data. GraphX 
is designed for this scale of data.

Ankur

[1] http://infoscience.epfl.ch/record/188535/files/paper.pdf
[2] http://graphlab.org/projects/graphchi.html

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX

2014-08-02 Thread Ankur Dave
At 2014-08-02 21:29:33 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote:
 How should I run graphx codes?

At the moment it's a little more complicated to run the GraphX algorithms than 
the Spark examples due to SPARK-1986 [1]. There is a driver program in 
org.apache.spark.graphx.lib.Analytics which you can invoke using spark-submit:

$SPARK_HOME/bin/spark-submit --master local[*] --class 
org.apache.spark.graphx.lib.Analytics \
$SPARK_HOME/assembly/target/scala-2.10/spark-assembly-*.jar \
pagerank /edge-list-file.txt --numEPart=8 --numIter=10 
--partStrategy=EdgePartition2D

This supports running PageRank, connected components, and triangle count. For 
the other algorithms, you can use the Spark shell:

import org.apache.spark.graphx._
val graph = (GraphLoader.edgeListFile(sc, /edge-list-file.txt, 
minEdgePartitions = 8)
  .partitionBy(PartitionStrategy.EdgePartition2D))
// Run algorithms on graph

Ankur

[1] https://issues.apache.org/jira/browse/SPARK-1986

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] how to compute only a subset of vertices in the whole graph?

2014-08-02 Thread Ankur Dave
At 2014-08-02 19:04:22 +0200, Yifan LI iamyifa...@gmail.com wrote:
 But I am thinking of if I can compute only some selected vertexes(hubs), not 
 to do update on every vertex…

 is it possible to do this using Pregel API?

The Pregel API already only runs vprog on vertices that received messages in 
the previous iteration. Is there some other selection criteria that you'd like 
to express?

Depending on the criteria, the easiest thing might be to move off the Pregel 
API and write against the lower-level GraphX API. I did this for PageRank 
recently [1].

 or, more realistically, only hubs can receive messages and compute results in 
 the LAST iteration? since we don't need the final result of non-hub vertices.

For this it might be easiest to duplicate the Pregel API implementation and 
special-case the last iteration.

Ankur

[1] 
https://github.com/ankurdave/spark/blob/3231a8efadbb45227dea63011c1a8dd856595475/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L274

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] The best way to construct a graph

2014-08-01 Thread Ankur Dave
At 2014-08-01 11:23:49 +0800, Bin wubin_phi...@126.com wrote:
 I am wondering what is the best way to construct a graph?

 Say I have some attributes for each user, and specific weight for each user 
 pair. The way I am currently doing is first read user information and edge 
 triple into two arrays, then use sc.parallelize to create vertexRDD and 
 edgeRDD, respectively. Then create the graph using Graph(vertices, edges).

 I wonder whether there is a better way to do this?

That's a perfectly fine way to construct a graph. Are you encountering a 
problem with it?

The only suggestion I would make is to load the data using sc.textFile rather 
than reading into an array and calling sc.parallelize. This will avoid loading 
it all into the driver's memory.

GraphLoader does have the slight advantage that it avoids allocating a pair per 
vertex, but this is unlikely to be a big cost, so it's fine to use 
Graph(vertices, edges) if GraphLoader isn't suitable.

Ankur


Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Ankur Dave
Attempting to build Spark from source on EC2 using sbt gives the error 
sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found. This only seems to happen on 
EC2, not on my local machine.

To reproduce, launch a cluster using spark-ec2, clone the Spark repository, and 
run sbt/sbt assembly. A complete transcript is at 
https://gist.github.com/ankurdave/bb96ea237700f5cd670c. Here is an excerpt with 
the error:

[info] Resolving org.scala-lang#scala-library;2.10.2 ...
[warn]  module not found: org.scala-lang#scala-library;2.10.2
[...]
[warn]  ::
[warn]  ::  UNRESOLVED DEPENDENCIES ::
[warn]  ::
[warn]  :: org.scala-lang#scala-library;2.10.2: not found
[warn]  ::
sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:217)
[...]
[error] (*:update) sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore?

After a bisection, it seems the problem was introduced by the SBT-Maven change 
(628932b) [1, 2].

Ankur

[1] https://github.com/apache/spark/pull/772
[2] https://issues.apache.org/jira/browse/SPARK-1776


Re: creating a distributed index

2014-08-01 Thread Ankur Dave
At 2014-08-01 14:50:22 -0600, Philip Ogren philip.og...@oracle.com wrote:
 It seems that I could do this with mapPartition so that each element in a
 partition gets added to an index for that partition.
 [...]
 Would it then be possible to take a string and query each partition's index
 with it? Or better yet, take a batch of strings and query each string in the
 batch against each partition's index?

I proposed a key-value store based on RDDs called IndexedRDD that does exactly 
what you described. It uses mapPartitions to construct an index within each 
partition, then exposes get and multiget methods to allow looking up values 
associated with given keys.

It will hopefully make it into Spark 1.2.0. Until then you can try it out by 
merging in the pull request locally: https://github.com/apache/spark/pull/1297.

See JIRA for details and slides on how it works: 
https://issues.apache.org/jira/browse/SPARK-2365.

Ankur


Re: Graphx : Perfomance comparison over cluster

2014-07-30 Thread Ankur Dave
ShreyanshB shreyanshpbh...@gmail.com writes:
 The version with in-memory shuffle is here:
 https://github.com/amplab/graphx2/commits/vldb.

 It'd be great if you can tell me how to configure and invoke this spark
 version.

Sorry for the delay on this. Assuming you're planning to launch an EC2 cluster, 
here's how to use the version of GraphX with in-memory shuffle:

1. Check out the in-memory shuffle branch locally. It's important to do this 
before launching the cluster to make sure the cluster gets set up in a way 
that's compatible with this version of Spark (using the v2 branch of 
https://github.com/mesos/spark-ec2).

git clone https://github.com/amplab/graphx2 -b vldb
mv graphx2 spark

2. Launch a cluster.

cd spark
ec2/spark-ec2 -s 16 -w 500 -k ec2-key-name -i path/to/ec2-key.pem -t 
m2.4xlarge -z us-east-1e --spot-price=1 launch graphx-benchmarking

3. On the cluster, check out and build the in-memory shuffle branch.

cd /mnt
git clone https://github.com/amplab/graphx2 -b vldb
mv graphx2 spark
cd spark
mkdir -p conf
cp ~/spark/conf/* conf/
sbt/sbt assembly
rsync -r --delete . ~/spark
~/spark/sbin/stop-all.sh
~/spark-ec2/copy-dir --delete ~/spark
~/spark/sbin/start-all.sh

3. Load your input graph onto HDFS in edge list format.

~/ephemeral-hdfs/bin/hadoop fs -put edge-list.txt /

4. Run PageRank using the Analytics driver.

cd ~/spark
MASTER=spark://$(wget -q -O - 
http://169.254.169.254/latest/meta-data/public-hostname):7077
/usr/bin/time -f TOTAL TIME: %e seconds ~/spark/bin/spark-class 
org.apache.spark.graphx.lib.Analytics $MASTER pagerank /edge-list.txt 
--numEPart=128 --numIter=10

Ankur


Re: [GraphX] How to access a vertex via vertexId?

2014-07-29 Thread Ankur Dave
Yifan LI iamyifa...@gmail.com writes:
 Maybe you could get the vertex, for instance, which id is 80, by using:

 graph.vertices.filter{case(id, _) = id==80}.collect

 but I am not sure this is the exactly efficient way.(it will scan the whole 
 table? if it can not get benefit from index of VertexRDD table)

Until IndexedRDD is merged, a scan and collect is the best officially supported 
way. PairRDDFunctions.lookup does this under the hood as well.

However, it's possible to use the VertexRDD's hash index to do a much more 
efficient lookup. Note that these APIs may change, since VertexPartitionBase 
and its subclasses are private[graphx].

You can access the partitions of a VertexRDD using VertexRDD#partitionsRDD, and 
each partition has VertexPartitionBase#isDefined and VertexPartitionBase#apply. 
Putting it all together:

val verts: VertexRDD[_] = ...
val targetVid: VertexId = 80L
val result = verts.partitionsRDD.flatMap { part =
  if (part.isDefined(targetVid)) Some(part(targetVid))
  else None
}.collect.head

Once IndexedRDD [1] is merged, it will provide this functionality using 
verts.get(targetVid). Its implementation of get also uses the hash partitioner 
to run only one task [2].

Ankur

[1] https://issues.apache.org/jira/browse/SPARK-2365
[2] 
https://github.com/ankurdave/spark/blob/IndexedRDD/core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala#L89


Re: the pregel operator of graphx throws NullPointerException

2014-07-29 Thread Ankur Dave
Denis RP qq378789...@gmail.com writes:
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to 
 stage failure: Task 6.0:4 failed 4 times, most recent failure: Exception 
 failure in TID 598 on host worker6.local: java.lang.NullPointerException
 [error] scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 [error] scala.collection.Iterator$class.foreach(Iterator.scala:727)
 [error] scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

It looks like Iterator.scala:328 [1] is Iterator#map, and it's likely failing 
because the map function is null.

I haven't seen this before, but I wonder if SPARK-2292 [2] is related. The 
stack trace is different there, but the problem of a function being null is the 
same. Based on the JIRA comments, it might be a problem with your build and 
launch process. How are you deploying your application?

Ankur

[1] 
https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/Iterator.scala#L328
[2] https://issues.apache.org/jira/browse/SPARK-2292


Re: VertexPartition and ShippableVertexPartition

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao xia...@sjtu.edu.cn wrote:

 On 7/28/14, 3:41 PM, shijiaxin wrote:

 There is a VertexPartition in the EdgePartition,which is created by

 EdgePartitionBuilder.toEdgePartition.

 and There is also a ShippableVertexPartition in the VertexRDD.

 These two Partitions have a lot of common things like index, data and

 Bitset, why is this necessary?



There is a VertexPartition in the EdgePartition,which is created by

Is the VertexPartition in the EdgePartition, the Mirror Cache part?


Yes, exactly. The primary copy of each vertex is stored in the VertexRDD
using the index, values, and mask data structures, which together form a
hash map. In addition, each partition of the VertexRDD stores the
corresponding partition of the routing table to facilitate joining with the
edges. The ShippableVertexPartition class encapsulates the vertex hash map
along with a RoutingTablePartition.

After joining the vertices with the edges, the edge partitions cache their
adjacent vertices in the mirror cache. They use the VertexPartition for
this, which provides only the hash map functionality and not the routing
table.

Ankur http://www.ankurdave.com/


Re: Spark streaming vs. spark usage

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 12:53 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 But when done processing, one would still have to pull out the wrapped
 object, knowing what it was, and I don't see how to do that.


It's pretty tricky to get the level of type safety you're looking for. I
know of two ways:

1. Leave RDD and DStream as they are, but define a typeclass
http://danielwestheide.com/blog/2013/02/06/the-neophytes-guide-to-scala-part-12-type-classes.html
that
allows converting them to a common DistributedCollection type. Example
https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-typeclass-scala
.

2. Make RDD and DStream inherit from a common DistributedCollection trait,
as in your example, but use F-bounded polymorphism
https://twitter.github.io/scala_school/advanced-types.html#fbounded to
express the concrete types. Example
https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-fbounded-polymorphism-scala
.

Ankur http://www.ankurdave.com/


Re: GraphX Pragel implementation

2014-07-24 Thread Ankur Dave
On Thu, Jul 24, 2014 at 9:52 AM, Arun Kumar toga...@gmail.com wrote:

 While using pregel  API for Iterations how to figure out which super step
 the iteration currently in.


The Pregel API doesn't currently expose this, but it's very straightforward
to modify Pregel.scala
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L112
to do so. Let me know if you'd like help doing this.

Ankur http://www.ankurdave.com/


Re: Where is the PowerGraph abstraction

2014-07-23 Thread Ankur Dave
We removed
https://github.com/apache/spark/commit/732333d78e46ee23025d81ca9fbe6d1e13e9f253
the PowerGraph abstraction layer when merging GraphX into Spark to reduce
the maintenance costs. You can still read the code
https://github.com/apache/spark/blob/feaa07802203b79f454454445c0a12a2784ccfeb/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
from the repository history, but it would be best to use the Pregel API if
possible.

Ankur http://www.ankurdave.com/


Re: the implications of some items in webUI

2014-07-22 Thread Ankur Dave
On Tue, Jul 22, 2014 at 7:08 AM, Yifan LI iamyifa...@gmail.com wrote:

 1) what is the difference between Duration(Stages - Completed Stages)
 and Task Time(Executors) ?


Stages are composed of tasks that run on executors. Tasks within a stage
may run concurrently, since there are multiple executors and each executor
may run more than one task at a time.

 An executor's task time is the sum of the durations of all of its tasks.
Because this is a simple sum, it does not take parallelism into account: if
an executor runs 8 tasks concurrently and each takes a minute, it has only
spent one minute of wallclock time, but the reported task time will be 8
minutes.

A stage's duration is how much wallclock time elapsed between when the
first task launched and when the last task finished. This does take
parallelism into account, so in the above example the stage duration would
be 1 minute.

2) what are the exact meanings of Shuffle Read/Shuffle Write?


Stages communicate using shuffles. Each task may start by reading shuffle
inputs across the network, and may end by writing shuffle outputs to disk
locally. See page 7 of the Spark NSDI paper
https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
for details.

Shuffle read and shuffle write refer to the total amount of data that a
stage read across the network and wrote to disk.

Ankur http://www.ankurdave.com/


Re: Question about initial message in graphx

2014-07-21 Thread Ankur Dave
On Mon, Jul 21, 2014 at 8:05 PM, Bin WU bw...@connect.ust.hk wrote:

 I am not sure how to specify different initial values to each node in the
 graph. Moreover, I am wondering why initial message is necessary. I think
 we can instead initialize the graph and then pass it into Pregel interface?


Indeed it's not necessary, and a future update to Pregel will probably
remove it: https://github.com/apache/spark/pull/1217

But the initial message parameter doesn't prevent you from specifying
different initial *values* for each vertex. Pregel respects the initial
vertex values of the provided graph; the initial message just ensures that
it will run vprog at least once per vertex.

If you don't want an initial message, one option is to set aside a special
message value for it and check for this in vprog. For example, if the
message type is Int, you could change it to Option[Int] and use None as the
initial message.

Ankur http://www.ankurdave.com/


Re: Graphx : Perfomance comparison over cluster

2014-07-20 Thread Ankur Dave
On Fri, Jul 18, 2014 at 9:07 PM, ShreyanshB shreyanshpbh...@gmail.com
 wrote:

 Does the suggested version with in-memory shuffle affects performance too
 much?


We've observed a 2-3x speedup from it, at least on larger graphs like
twitter-2010 http://law.di.unimi.it/webdata/twitter-2010/ and uk-2007-05
http://law.di.unimi.it/webdata/uk-2007-05/.

(according to previously reported numbers, graphx did 10 iterations in 142
 seconds and in latest stats it does it in 68 seconds). Is it just the
 in-memory version which is changed?


If you're referring to previous results vs. the arXiv paper, there were
several improvements, but in-memory shuffle had the largest impact.

Ankur http://www.ankurdave.com/


Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Ankur Dave
On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:

 Yes, is possible to defining a custom partition strategy?


Yes, you just need to create a subclass of PartitionStrategy as follows:

import org.apache.spark.graphx._
object MyPartitionStrategy extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
PartitionID): PartitionID = {
// put your hash function here  }
}

if I load one edges file(5 GB), without any cores/partitions setting, what
 is the default partition in graph construction? and how many cores will be
 used?
 Or, if the size of file is 50 GB(more than available memory, without
 partition setting)?


If you don't specify a number of partitions, it will default to 2 or the
number of blocks in the input file, whichever is greater (this is the same
behavior as sc.textFile
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1243).
I'm not sure exactly how many this will be for your file, but you can find
out by running sc.textFile(...).partitions.length.

The default partitioning strategy is to leave the edges in the same
partitions as they were initially loaded from.

For the 50 GB file, everything is the same except the default number of
partitions will probably be larger. Hopefully each partition will
individually still fit in memory, which will allow GraphX to proceed.

the whole vertex table VA will be replicated to all these 3 edge
 partitions? since each of them refers to some vertexes in VA.


Yes, for that graph and partitioning, all vertices will be replicated to
all edge partitions. It won't be quite as bad for most graphs, and
EdgePartition2D
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$$EdgePartition2D$
provides a bound on the replication factor, as described in the docs.

You mean that the core is stricter to access only its own partition in
 memory?
 how do they communicate when the required data(edges?) in another
 partition?


Right, there is one task per core, and each task only accesses its own
partition, never the partitions of other tasks.

Tasks communicate in two ways:
1. Vertex replication (multicast). When an edge needs to access its
neighboring vertices (to construct the triplets or perform the map step in
mapReduceTriplets), GraphX replicates vertices to all edge partitions where
they are referenced.
2. Vertex aggregation. When an edge needs to update the value of a
neighboring vertex (to perform the reduce step in mapReduceTriplets),
GraphX aggregates vertex updates from the edge partitions back to the
vertices.

Both of these communication steps happen using Spark shuffles, which work
by writing messages to disk and notifying other tasks to read them from
disk.

Note that edges never need to access other edges directly, and all
communication is done through the vertices.

Ankur http://www.ankurdave.com/


Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Ankur Dave
Sorry, I didn't read your vertex replication example carefully, so my
previous answer is wrong. Here's the correct one:

On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:

 I don't understand, for instance, we have 3 edge partition tables(EA: a -
 b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA:
 a, b, c; VB: d, e), the whole vertex table VA will be replicated to all
 these 3 edge partitions? since each of them refers to some vertexes in VA.


Vertices can be replicated individually without requiring the entire vertex
partition to be replicated. In this case, here's what will get replicated
to each partition:

EA: a (from VA), b (from VA), c (from VA)
EB: a (from VA), d (from VB), e (from VB)
EC: c (from VA), d (from VB)

Ankur http://www.ankurdave.com/


Re: Graphx : Perfomance comparison over cluster

2014-07-18 Thread Ankur Dave
Thanks for your interest. I should point out that the numbers in the arXiv
paper are from GraphX running on top of a custom version of Spark with an
experimental in-memory shuffle prototype. As a result, if you benchmark
GraphX at the current master, it's expected that it will be 2-3x slower
than GraphLab.

The version with in-memory shuffle is here:
https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has
changed a lot since then, and the way to configure and invoke Spark is
different. I can send you the correct configuration/invocation for this if
you're interested in benchmarking it.

On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com
 wrote:

 Should I use the pagerank application already available in graphx for this 
 purpose
 or need to modify or need to write my own?


You should use the built-in PageRank. If your graph is available in edge
list format, you can run it using the Analytics driver as follows:

~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class
org.apache.spark.graphx.lib.Analytics
~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar
pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS
[--partStrategy=$PARTITION_STRATEGY]

What should be the executor_memory, i.e. maximum or according to graph
 size?


As much memory as possible while leaving room for the operating system.

Is there any other configuration I should do to have the best performance?


I think the parameters to Analytics above should be sufficient:

- numEPart - should be equal to or a small integer multiple of the number
of cores. More partitions improve work balance but also increase memory
usage and communication, so in some cases it can even be faster with fewer
partitions than cores.
- partStrategy - If your edges are already sorted, you can skip this
option, because GraphX will leave them as-is by default and that may be
close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both
worth trying.

CC'ing Joey and Dan, who may have other suggestions.

Ankur http://www.ankurdave.com/


On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com
wrote:

 Hi,

 I am trying to compare Graphx and other distributed graph processing
 systems
 (graphlab) on my cluster of 64 nodes, each node having 32 cores and
 connected with infinite band.

 I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over
 there. I had few questions regarding configuration and achieving best
 performance.

 * Should I use the pagerank application already available in graphx for
 this
 purpose or need to modify or need to write my own?
- If I shouldn't use the inbuilt pagerank, can you share your pagerank
 application?

 * What should be the executor_memory, i.e. maximum or according to graph
 size?

 * Other than, number of cores, executor_memory and partition strategy, Is
 there any other configuration I should do to have the best performance?

 I am using following script,
 import org.apache.spark._
 import org.apache.spark.graphx._
 import org.apache.spark.rdd.RDD

 val startgraphloading = System.currentTimeMillis;
 val graph = GraphLoader.edgeListFile(sc, filepath,true,32)
 val endgraphloading = System.currentTimeMillis;


 Thanks in advance :)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: GraphX Pragel implementation

2014-07-17 Thread Ankur Dave
If your sendMsg function needs to know the incoming messages as well as the
vertex value, you could define VD to be a tuple of the vertex value and the
last received message. The vprog function would then store the incoming
messages into the tuple, allowing sendMsg to access them.

For example, if the vertex value was a String and the message type was an
Int, you could call Pregel as follows:

val graph: Graph[String, _] = ...

graph.mapVertices((id, attr) = (attr, 0)).pregel(0)(
  (id, attr: (String, Int), msg: Int) = (attr._1, msg),
  edge = Iterator(...), // can use edge.srcAttr._2 and
edge.dstAttr._2 to access the messages  (a: Int, b: Int) = a + b)


Ankur http://www.ankurdave.com/


Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-15 Thread Ankur Dave
On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote:

 Btw, is there any possibility to customise the partition strategy as we
 expect?


I'm not sure I understand. Are you asking about defining a custom
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-how-to-specify-partition-strategy-td9309.html#a9338
partition strategy?

On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote:

 when I load the file using sc.textFile (minPartitions = *16*,
 PartitionStrategy.RandomVertexCut)


The easiest way to load the edge file would actually be to use
GraphLoader.edgeListFile(sc,
path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut)
.

1) how much data will be loaded into memory?


The exact size of the graph (vertices + edges) in memory depends on the
graph's structure, the partition function, and the average vertex degree,
because each vertex must be replicated to all partitions where it is
referenced.

It's easier to estimate the size of just the edges, which I did on the
mailing list
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-partition-problem-tp6248p6363.html
a while ago. To summarize, during graph construction each edge takes 60
bytes, and once the graph is constructed it takes 20 bytes.

2) how many partitions will be stored in memory?


Once you call cache() on the graph, all 16 partitions will be stored in
memory. You can also tell GraphX to spill them to disk in case of memory
pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to
GraphLoader.edgeListFile.

3) If the thread/task on each core will read only *one* edge from memory
 and then compute it at every time?


Yes, each task is single-threaded, so it will read the edges in its
partition sequentially.

3.1) which edge on memory was determined to read into cache?


In theory, each core should independently read the edges in its partition
into its own cache. Cache issues should be much less of a concern than in
most applications because different tasks (cores) operate on independent
partitions; there is no shared-memory parallelism. The tradeoff is heavier
reliance on shuffles.

3.2) how are those partitions being scheduled?


Spark handles the scheduling. There are details in Section 5.1 of the Spark
NSDI paper
https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf;
in short, tasks are scheduled to run on the same machine as their data
partition, and by default each machine can accept at most one task per core.

Ankur http://www.ankurdave.com/


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
On Fri, Jul 11, 2014 at 2:23 PM, ShreyanshB shreyanshpbh...@gmail.com
 wrote:

 -- Is it a correct way to load file to get best performance?


Yes, edgeListFile should be efficient at loading the edges.

-- What should be the partition size? =computing node or =cores?


In general it should be a multiple of the number of cores to exploit all
available parallelism, but because of shuffle overhead, it might help to
use fewer partitions -- in some cases even fewer than the number of cores.
You can measure the performance with different numbers of partitions to see
what is best.

-- I see following error so many times in my logs [...]
 NotSerializableException


This is a known bug, and there are two possible resolutions:

1. Switch from Java serialization to Kryo serialization, which is faster
and will also resolve the problem, by setting the following Spark
properties in conf/spark-defaults.conf:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.apache.spark.graphx.GraphKryoRegistrator

2. Mark the affected classes as Serializable. I'll submit a patch with this
fix as well, but for now I'd suggest trying Kryo if possible.

Ankur http://www.ankurdave.com/


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
I don't think it should affect performance very much, because GraphX
doesn't serialize ShippableVertexPartition in the fast path of
mapReduceTriplets execution (instead it calls
ShippableVertexPartition.shipVertexAttributes and serializes the result). I
think it should only get serialized for speculative execution, if you have
that enabled.

By the way, here's the fix: https://github.com/apache/spark/pull/1376

Ankur http://www.ankurdave.com/


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
Spark just uses opens up inter-slave TCP connections for message passing
during shuffles (I think the relevant code is in ConnectionManager). Since
TCP automatically determines
http://en.wikipedia.org/wiki/TCP_congestion-avoidance_algorithm the
optimal sending rate, Spark doesn't need any configuration parameters for
this.

Ankur http://www.ankurdave.com/


Re: GraphX: how to specify partition strategy?

2014-07-10 Thread Ankur Dave
On Thu, Jul 10, 2014 at 8:20 AM, Yifan LI iamyifa...@gmail.com wrote:

 - how to build the latest version of Spark from the master branch, which
 contains a fix?


Instead of downloading a prebuilt Spark release from
http://spark.apache.org/downloads.html, follow the instructions under
Development Version on that page. In short:

git clone git://github.com/apache/spark.git
cd spark
sbt/sbt assembly

Then you can run bin/spark-shell and bin/spark-submit as usual, and
Graph.partitionBy should work.

- how to specify other partition strategy, eg. CanonicalRandomVertexCut,
 EdgePartition1D, EdgePartition2D, RandomVertexCut
 (listed in Scala API document, but seems only EdgePartition2D is
 available? I am not sure for this! )


All of those partition strategies should be available -- for example, you
can call graph.partitionBy(PartitionStrategy.RandomVertexCut).

- Is it possible to add my own partition strategy(hash function, etc.) into
 GraphX?


Yes, you just need to create a subclass of PartitionStrategy as follows:

import org.apache.spark.graphx._

object MyPartitionStrategy extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
PartitionID): PartitionID = {
// put your hash function here
  }
}

Ankur http://www.ankurdave.com/


Re: tiers of caching

2014-07-07 Thread Ankur Dave
I think tiers/priorities for caching are a very good idea and I'd be
interested to see what others think. In addition to letting libraries cache
RDDs liberally, it could also unify memory management across other parts of
Spark. For example, small shuffles benefit from explicitly keeping the
shuffle outputs in memory rather than writing it to disk, possibly due to
filesystem overhead. To prevent in-memory shuffle outputs from competing
with application RDDs, Spark could mark them as lower-priority and specify
that they should be dropped to disk when memory runs low.

Ankur http://www.ankurdave.com/


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-06 Thread Ankur Dave
Well, the alternative is to do a deep equality check on the index arrays,
which would be somewhat expensive since these are pretty large arrays (one
element per vertex in the graph). But, in case the reference equality check
fails, it actually might be a good idea to do the deep check before
resorting to the slow code path.

Ankur http://www.ankurdave.com/


Re: Graphx traversal and merge interesting edges

2014-07-05 Thread Ankur Dave
Interesting problem! My understanding is that you want to (1) find paths
matching a particular pattern, and (2) add edges between the start and end
vertices of the matched paths.

For (1), I implemented a pattern matcher for GraphX
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala
that iteratively accumulates partial pattern matches. I used your example
in the unit test
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala
.

For (2), you can take the output of the pattern matcher (the set of
matching paths organized by their terminal vertices) and construct a set of
new edges using the initial and terminal vertices of each path. Then you
can make a new graph consisting of the union of the original edge set and
the new edges. Let me know if you'd like help with this.

Ankur http://www.ankurdave.com/


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-05 Thread Ankur Dave
When joining two VertexRDDs with identical indexes, GraphX can use a fast
code path (a zip join without any hash lookups). However, the check for
identical indexes is performed using reference equality.

Without caching, two copies of the index are created. Although the two
indexes are structurally identical, they fail reference equality, and so
GraphX mistakenly uses the slow path involving a hash lookup per joined
element.

I'm working on a patch https://github.com/apache/spark/pull/1297 that
attempts an optimistic zip join with per-element fallback to hash lookups,
which would improve this situation.

Ankur http://www.ankurdave.com/


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-03 Thread Ankur Dave
A common reason for the Joining ... is slow message is that you're
joining VertexRDDs without having cached them first. This will cause Spark
to recompute unnecessarily, and as a side effect, the same index will get
created twice and GraphX won't be able to do an efficient zip join.

For example, the following code will counterintuitively produce the
Joining ... is slow message:

val a = VertexRDD(sc.parallelize((1 to 100).map(x = (x.toLong, x
a.leftJoin(a) { (id, a, b) = a + b }

The remedy is to call a.cache() before a.leftJoin(a).

Ankur http://www.ankurdave.com/


Re: Graphx SubGraph

2014-06-24 Thread Ankur Dave
Yes, the subgraph operator takes a vertex predicate and keeps only the
edges where both vertices satisfy the predicate, so it will work as long as
you can express the sublist in terms of a vertex predicate.

If that's not possible, you can still obtain the same effect, but you'll
have to use lower-level operations similar to how subgraph is itself
implemented. I can help out if that's the case.

Ankur http://www.ankurdave.com/


Re: Query on Merge Message (Graph: pregel operator)

2014-06-19 Thread Ankur Dave
Many merge operations can be broken up to work incrementally. For example,
if the merge operation is to sum *n* rank updates, then you can set mergeMsg
= (a, b) = a + b and this function will be applied to all *n* updates in
arbitrary order to yield a final sum. Addition, multiplication, min, max,
and mean are operations that work in this manner (they are associative and
commutative).

If you absolutely must operate on all *n* messages at once, for example to
find the median, then a workaround is to emit Array(m) instead of m in the
sendMsg function, and then to set mergeMsg = (a, b) = a ++ b. This will
accumulate all inbound messages into an array which you can access in vprog.
However, it will be much slower for graphs with high-degree vertices,
because the accumulated arrays can grow very large.

Ankur http://www.ankurdave.com/


Re: BSP realization on Spark

2014-06-19 Thread Ankur Dave
Master/worker assignment and barrier synchronization are handled by Spark,
so Pregel will automatically coordinate the computation, communication, and
synchronization needed to run for the specified number of supersteps (or
until there are no messages sent in a particular superstep).

Ankur http://www.ankurdave.com/


Re: Bayes Net with Graphx?

2014-06-06 Thread Ankur Dave
Vertices can have arbitrary properties associated with them:
http://spark.apache.org/docs/latest/graphx-programming-guide.html#the-property-graph

Ankur http://www.ankurdave.com/


Re: GraphX partition problem

2014-05-28 Thread Ankur Dave
I've been trying to reproduce this but I haven't succeeded so far. For
example, on the web-Google
https://snap.stanford.edu/data/web-Google.htmlgraph, I get the
expected results both on v0.9.1-handle-empty-partitions
and on master:

// Load web-Google and run connected componentsimport
org.apache.spark.graphx._val g = GraphLoader.edgeListFile(sc,
/Users/ankurdave/Downloads/web-Google.txt,
  minEdgePartitions=8)
g.vertices.count // = 875713val cc =
g.connectedComponents.vertices.map(_._2).cache()
cc.count // = 875713val counts = cc.countByValue
counts.values.sum // = 875713// There should not be any single-vertex
components, because we loaded an edge listcounts.count(_._2 == 0) //
= 0counts.count(_._2 == 1) // = 0counts.count(_._2 == 2) // =
783counts.count(_._2 == 3) // = 503// The 3 smallest and largest
components in the graph (with nondeterministic
tiebreaking)counts.toArray.sortBy(_._2).take(3) // =
Array((418467,2), (272504,2),
(719750,2))counts.toArray.sortBy(_._2).takeRight(3) // =
Array((1363,384), (1734,404), (0,855802))


Ankur http://www.ankurdave.com/


Re: Persist and unpersist

2014-05-27 Thread Ankur Dave
I think what's desired here is for input to be unpersisted automatically as
soon as result is materialized. I don't think there's currently a way to do
this, but the usual workaround is to force result to be materialized
immediately and then unpersist input:

input.cache()val count = input.countval result = input.filter(...)
result.cache().foreach(x = {}) // materialize resultinput.unpersist()
// safe because `result` is materialized  // and is
the only RDD that depends on `input`return result


Ankur http://www.ankurdave.com/


Re: counting degrees graphx

2014-05-26 Thread Ankur Dave
Oh, looks like the Scala Map isn't serializable. I switched the code to use
java.util.HashMap, which should work.

Ankur http://www.ankurdave.com/


On Mon, May 26, 2014 at 3:21 PM, daze5112 david.zeel...@ato.gov.au wrote:

 Excellent thanks Ankur, looks like what im looking for  Only one problem
 the
 line

 val dists = initDists.pregel[DistanceMap](Map())(vprog, sendMsg, mergeMsg)

 produces an error
  Job aborted: Task 268.0:5 had a not serializable result:
 java.io.NotSerializableException:
 scala.collection.immutable.MapLike$$anon$2

 any ideas?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/counting-degrees-graphx-tp6370p6405.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: counting degrees graphx

2014-05-26 Thread Ankur Dave
On closer inspection it looks like Map normally is serializable, and it's
just a bug in mapValues, so I changed to using the .map(identity)
workaround described in https://issues.scala-lang.org/browse/SI-7005.

Ankur http://www.ankurdave.com/


Re: GraphX partition problem

2014-05-25 Thread Ankur Dave
Once the graph is built, edges are stored in parallel primitive arrays, so
each edge should only take 20 bytes to store (srcId: Long, dstId: Long,
attr: Int). Unfortunately, the current implementation in
EdgePartitionBuilder uses an array of Edge objects as an intermediate
representation for sorting, so each edge will additionally take something
like 40 bytes during graph construction (srcId (8) + dstId (8) + attr (4) +
uncompressed pointer (8) + object overhead (8) + padding (4)). So you'll
need about 1.2 TB of memory in total (60 bytes * 20 billion). Does your
cluster have this much memory?

If not, I've been meaning to write a custom sort routine that can operate
directly on the three parallel arrays. This will reduce the memory
requirements to about 400 GB.

Ankur http://www.ankurdave.com/


Re: counting degrees graphx

2014-05-25 Thread Ankur Dave
I'm not sure I understand what you're looking for. Could you provide some
more examples to clarify?

One interpretation is that you want to tag the source vertices in a graph
(those with zero indegree) and find for each vertex the set of sources that
lead to that vertex. For vertices 1-8 in the graph, this would give you
[{1}, {1}, {1}, {1}, {1, 6}, {7}, {7}].

Ankur http://www.ankurdave.com/


Re: GraphX vertices and connected edges

2014-05-02 Thread Ankur Dave
Do you mean you want to obtain a list of adjacent edges for every vertex? A
mapReduceTriplets followed by a join is the right way to do this. The join
will be cheap because the original and derived vertices will share indices.

There's a built-in function to do this for neighboring vertex properties
called 
GraphOps.collectNeighborshttp://spark.apache.org/docs/latest/api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]].
In the latest version of GraphX there's also
GraphOps.collectEdgeshttps://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala#L140,
but that doesn't do the join.

As the docs for these functions state, this will be memory-intensive for
high-degree vertices, so it would be better to break up the computation so
you don't need to collect all neighbors or edges if possible.

Ankur http://www.ankurdave.com/


On Fri, May 2, 2014 at 11:34 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 What is the most efficient way to an RDD of GraphX vertices and their
 connected edges?



  1   2   >