Re: creating a distributed index
The latest version of IndexedRDD supports any key type with a defined serializer https://github.com/amplab/spark-indexedrdd/blob/master/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/KeySerializer.scala, including Strings. It's not released yet, but you can use it from the master branch if you're interested. Ankur http://www.ankurdave.com/ On Wed, Jul 15, 2015 at 12:43 AM, Jem Tucker jem.tuc...@gmail.com wrote: With regards to Indexed structures in Spark are there any alternatives to IndexedRDD for more generic keys including Strings? Thanks Jem
Re: Effecient way to fetch all records on a particular node/partition in GraphX
If you know the partition IDs, you can launch a job that runs tasks on only those partitions by calling sc.runJob https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1686. For example, we do this in IndexedRDD https://github.com/amplab/spark-indexedrdd/blob/f0c42dcad1f49ce36140f0c1f7d2c3ed61ed373e/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexedRDDLike.scala#L100 to get particular keys without launching a task on every partition. Ankur http://www.ankurdave.com/ On Sun, May 17, 2015 at 8:32 AM, mas mas.ha...@gmail.com wrote: I have distributed my RDD into say 10 nodes. I want to fetch the data that resides on a particular node say node 5. How i can achieve this? I have tried mapPartitionWithIndex function to filter the data of that corresponding node, however it is pretty expensive.
Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs
I'm the primary author of IndexedRDD. To answer your questions: 1. Operations on an IndexedRDD partition can only be performed from a task operating on that partition, since doing otherwise would require decentralized coordination between workers, which is difficult in Spark. If you want to perform cross-partition lookups, you'll have to do all the lookups in a batch step as follows: val a = IndexedRDD(...) val b = sc.parallelize(...) // Perform an operation on b that produces some keys to look up in a val lookups: RDD[Long] = b.map(...) // Repartition the desired keys to their appropriate partitions in a and do local lookups, returning the corresponding values val results = a.innerJoin(b.map(k = (k, ( { (id, v, unit) = v } 2. IndexedRDD originated from GraphX but can be used for general operations as long as they fit within Spark's batch-oriented programming model. By the way, a new version of IndexedRDD is about to be released. If you decide to use IndexedRDD I'd suggest trying that out, since it provides a cleaner interface, more predictable performance, and support for arbitrary key types: https://github.com/amplab/spark-indexedrdd/pull/4 Ankur http://www.ankurdave.com/ On Thu, Apr 16, 2015 at 2:34 PM, Evo Eftimov evo.efti...@isecc.com wrote: Thanks but we need a firm statement and preferably from somebody from the spark vendor Data Bricks including answer to the specific question posed by me and assessment/confirmation whether this is a production ready / quality library which can be used for general purpose RDDs not just inside the context of graphx *From:* Koert Kuipers [mailto:ko...@tresata.com] *Sent:* Thursday, April 16, 2015 10:31 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs i believe it is a generalization of some classes inside graphx, where there was/is a need to keep stuff indexed for random access within each rdd partition On Thu, Apr 16, 2015 at 5:00 PM, Evo Eftimov evo.efti...@isecc.com wrote: Can somebody from Data Briks sched more light on this Indexed RDD library https://github.com/amplab/spark-indexedrdd It seems to come from AMP Labs and most of the Data Bricks guys are from there What is especially interesting is whether the Point Lookup (and the other primitives) can work from within a function (e.g. map) running on executors on worker nodes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] aggregateMessages with active set
Actually, GraphX doesn't need to scan all the edges, because it maintains a clustered index on the source vertex id (that is, it sorts the edges by source vertex id and stores the offsets in a hash table). If the activeDirection is appropriately set, it can then jump only to the clusters with active source vertices. See the EdgePartition#index field [1], which stores the offsets, and the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which decides whether to do a full scan or use the index. [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60 [2]. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266 Ankur On Thu, Apr 9, 2015 at 3:21 AM, James alcaid1...@gmail.com wrote: In aggregateMessagesWithActiveSet, Spark still have to read all edges. It means that a fixed time which scale with graph size is unavoidable on a pregel-like iteration. But what if I have to iterate nearly 100 iterations but at the last 50 iterations there are only 0.1% nodes need to be updated ? The fixed time make the program finished at a unacceptable time consumption. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] aggregateMessages with active set
We thought it would be better to simplify the interface, since the active set is a performance optimization but the result is identical to calling subgraph before aggregateMessages. The active set option is still there in the package-private method aggregateMessagesWithActiveSet. You can actually access it publicly via GraphImpl, though the API isn't guaranteed to be stable: graph.asInstanceOf[GraphImpl[VD,ED]].aggregateMessagesWithActiveSet(...) Ankur On Tue, Apr 7, 2015 at 2:56 AM, James alcaid1...@gmail.com wrote: Hello, The old api of GraphX mapReduceTriplets has an optional parameter activeSetOpt: Option[(VertexRDD[_] that limit the input of sendMessage. However, to the new api aggregateMessages I could not find this option, why it does not offer any more? Alcaid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Graphx gets slower as the iteration number increases
This might be because partitions are getting dropped from memory and needing to be recomputed. How much memory is in the cluster, and how large are the partitions? This information should be in the Executors and Storage pages in the web UI. Ankur http://www.ankurdave.com/ On Tue, Mar 24, 2015 at 7:12 PM, orangepri...@foxmail.com orangepri...@foxmail.com wrote: I'm working with graphx to calculate the pageranks of an extreme large social network with billion verteces. As iteration number increases, the speed of each iteration becomes slower and unacceptable. Is there any reason of it?
Re: Learning GraphX Questions
At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote: 1) How do you actually run programs in GraphX? At the moment I've been doing everything live through the shell, but I'd obviously like to be able to work on it by writing and running scripts. You can create your own projects that build against Spark and GraphX through a Maven dependency [1], then run those applications using the bin/spark-submit script included with Spark [2]. These guides assume you already know how to do this using your preferred build tool (SBT or Maven). In short, here's how to do it with SBT: 1. Install SBT locally (`brew install sbt` on OS X). 2. Inside your project directory, create a build.sbt file listing Spark and GraphX as a dependency, as in [3]. 3. Run `sbt package` in a shell. 4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit. [1] http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark [2] http://spark.apache.org/docs/latest/submitting-applications.html [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4 2) Is there a way to check the status of the partitions of a graph? For example, I want to determine for starters if the number of partitions requested are always made, like if I ask for 8 partitions but only have 4 cores what happens? You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you can do for example: graph.vertices.partitions 3) Would I be able to partition by vertex instead of edges, even if I had to write it myself? I know partitioning by edges is favored in a majority of the cases, but for the sake of research I'd like to be able to do both. If you pass PartitionStrategy.EdgePartition1D, this will partition edges by their source vertices, so all edges with the same source will be co-partitioned, and the communication pattern will be similar to vertex-partitioned (edge-cut) systems like Giraph. 4) Is there a better way to time processes outside of using built-in unix timing through the logs or something? I think the options are Unix timing, log file timestamp parsing, looking at the web UI, or writing timing code within your program (System.currentTimeMillis and System.nanoTime). Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX: ShortestPaths does not terminate on a grid graph
Thanks for the reminder. I just created a PR: https://github.com/apache/spark/pull/4273 Ankur On Thu, Jan 29, 2015 at 7:25 AM, Jay Hutfles jayhutf...@gmail.com wrote: Just curious, is this set to be merged at some point? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: graph.inDegrees including zero values
You can do this using leftJoin, as collectNeighbors [1] does: graph.vertices.leftJoin(graph.inDegrees) { (vid, attr, inDegOpt) = inDegOpt.getOrElse(0) } [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala#L145 Ankur On Sun, Jan 25, 2015 at 5:52 AM, scharissis stefano.charis...@gmail.com wrote: If a vertex has no in-degree then Spark's GraphOp 'inDegree' does not return it at all. Instead, it would be very useful to me to be able to have that vertex returned with an in-degree of zero. What's the best way to achieve this using the GraphX API? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX: ShortestPaths does not terminate on a grid graph
At 2015-01-22 02:06:37 -0800, NicolasC nicolas.ch...@inria.fr wrote: I try to execute a simple program that runs the ShortestPaths algorithm (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph. I use Spark 1.2.0 downloaded from spark.apache.org. This program runs more than 2 hours when the grid size is 70x70 as above, and is then killed by the resource manager of the cluster (Torque). After a 5-6 minutes of execution, the Spark master UI does not even respond. For a grid size of 30x30, the program terminates in about 20 seconds, and for a grid size of 50x50 it finishes in about 80 seconds. The problem appears for a grid size of 70x70 and above. Unfortunately this problem is due to a Spark bug. In later iterations of iterative algorithms, the lineage maintained for fault tolerance grows long and causes Spark to consume increasing amounts of resources for scheduling and task serialization. The workaround is to checkpoint the graph periodically, which writes it to stable storage and interrupts the lineage chain before it grows too long. If you're able to recompile Spark, you can do this by applying the patch to GraphX at the end of this mail, and before running graph algorithms, calling sc.setCheckpointDir(/tmp) to set the checkpoint directory as desired. Ankur === patch begins here === diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620..1fbbb87 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -126,6 +126,8 @@ object Pregel extends Logging { // Loop var prevG: Graph[VD, ED] = null var i = 0 +val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty +val checkpointFrequency = 25 while (activeMessages 0 i maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() @@ -139,6 +141,14 @@ object Pregel extends Logging { // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() + + if (checkpoint i % checkpointFrequency == checkpointFrequency - 1) { +logInfo(Checkpointing in iteration + i) +g.vertices.checkpoint() +g.edges.checkpoint() +messages.checkpoint() + } + // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using graphx to calculate average distance of a big graph
[-dev] What size of graph are you hoping to run this on? For small graphs where materializing the all-pairs shortest path is an option, you could simply find the APSP using https://github.com/apache/spark/pull/3619 and then take the average distance (apsp.map(_._2.toDouble).mean). Ankur http://www.ankurdave.com/ On Sun, Jan 4, 2015 at 6:28 PM, James alcaid1...@gmail.com wrote: Recently we want to use spark to calculate the average shortest path distance between each reachable pair of nodes in a very big graph. Is there any one ever try this? We hope to discuss about the problem.
Re: representing RDF literals as vertex properties
At 2014-12-08 12:12:16 -0800, spr s...@yarcdata.com wrote: OK, have waded into implementing this and have gotten pretty far, but am now hitting something I don't understand, an NoSuchMethodError. [...] The (short) traceback looks like Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.graphx.Graph$.apply$default$4()Lorg/apache/spark/storage/StorageLevel; [...] Is the method that's not found (.../StorageLevel) something I need to initialize? Using this same code on a toy problem works fine. This is a binary compatibility error and shouldn't happen as long as you're compiling and running with the same Spark assembly jar. Is it possible there's a version mismatch between compiling and running? Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Graphx] which way is better to access faraway neighbors?
At 2014-12-05 02:26:52 -0800, Yifan LI iamyifa...@gmail.com wrote: I have a graph in where each vertex keep several messages to some faraway neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. k = 5). now, I propose to distribute these messages to their corresponding destinations(say, faraway neighbours”): - by using pregel api, one superset is enough - by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD and its intermediate results. w.r.t the communication among machines, and the high cost of groupByKey/leftJoin, I guess that 1st option is better? If messages will only travel along edges (even if they travel over multiple edges), then the Pregel API should be faster. You'll have to run k supersteps for messages to propagate k hops away from their origins. If messages can jump directly between two arbitrary vertices, then doing a single set of Spark basic operations may be faster than running multiple Pregel supersteps. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Determination of number of RDDs
At 2014-12-04 02:08:45 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? This is possible: you can collect the elements to the driver, then create an RDD for each element. If you have so many elements that collect them to the driver is infeasible, there's probably an alternative solution that doesn't involve creating one RDD per element. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX Pregel halting condition
There's no built-in support for doing this, so the best option is to copy and modify Pregel to check the accumulator at the end of each iteration. This is robust and shouldn't be too hard, since the Pregel code is short and only uses public GraphX APIs. Ankur At 2014-12-03 09:37:01 -0800, Jay Hutfles jayhutf...@gmail.com wrote: I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: representing RDF literals as vertex properties
At 2014-12-04 16:26:50 -0800, spr s...@yarcdata.com wrote: I'm also looking at how to represent literals as vertex properties. It seems one way to do this is via positional convention in an Array/Tuple/List that is the VD; i.e., to represent height, weight, and eyeColor, the VD could be a Tuple3(Double, Double, String). [...] Given that vertices can have many many properties, it seems memory consumption for the properties should be as parsimonious as possible. Will any of Array/Tuple/List support sparse usage? Is Option the way to get there? Storing vertex properties positionally with Array[Option[Any]] or any of the other sequence types will provide a dense representation. For a sparse representation, the right data type is a Map[String, Any], which will let you access properties by name and will only store the nonempty properties. Since the value type in the map has to be Any, or more precisely the least upper bound of the property types, this sacrifices type safety and you'll have to downcast when retrieving properties. If there are particular subsets of the properties that frequently go together, you could instead use a class hierarchy. For example, if the vertices are either people or products, you could use the following: sealed trait VertexProperty extends Serializable case class Person(name: String, weight: Int) extends VertexProperty case class Product(name: String, price: Int) extends VertexProperty Then you could pattern match against the hierarchy instead of downcasting: List(Person(Bob, 180), Product(chair, 800), Product(desk, 200)).flatMap { case Person(name, weight) = Array.empty[Int] case Product(name, price) = Array(price) }.sum Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter using the Vertex Ids
At 2014-12-03 02:13:49 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: We cannot do sc.parallelize(List(VertexRDD)), can we? There's no need to do this, because every VertexRDD is also a pair RDD: class VertexRDD[VD] extends RDD[(VertexId, VD)] You can simply use graph.vertices in place of `a` in my example. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter using the Vertex Ids
At 2014-12-02 22:01:20 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph which returns the following on doing graph.vertices (1, 1.0) (2, 1.0) (3, 2.0) (4, 2.0) (5, 0.0) I want to group all the vertices with the same attribute together, like into one RDD or something. I want all the vertices with same attribute to be together. You can do this by flipping the tuples so the values become the keys, then using one of the by-key functions in PairRDDFunctions: val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) val b: RDD[(Double, Int)] = a.map(kv = (kv._2, kv._1)) val c: RDD[(Double, Iterable[Int])] = b.groupByKey(numPartitions = 5) c.collect.foreach(println) // (0.0,CompactBuffer(5)) // (1.0,CompactBuffer(1, 2)) // (2.0,CompactBuffer(3, 4)) Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter using the Vertex Ids
To get that function in scope you have to import org.apache.spark.SparkContext._ Ankur On Wednesday, December 3, 2014, Deep Pradhan pradhandeep1...@gmail.com wrote: But groupByKey() gives me the error saying that it is not a member of org.apache.spark,rdd,RDD[(Double, org.apache.spark.graphx.VertexId)] -- Ankur http://www.ankurdave.com/
Re: how to force graphx to execute transfomtation
At 2014-11-26 05:25:10 -0800, Hlib Mykhailenko hlib.mykhaile...@inria.fr wrote: I work with Graphx. When I call graph.partitionBy(..) nothing happens, because, as I understood, that all transformation are lazy and partitionBy is built using transformations. Is there way how to force spark to actually execute this transformation and not use any action? If you just want the transformation to run without returning anything, such as for benchmarking, you can use graph.partitionBy(...).foreachPartition(x = {}). Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: inconsistent edge counts in GraphX
At 2014-11-11 01:51:43 +, Buttler, David buttl...@llnl.gov wrote: I am building a graph from a large CSV file. Each record contains a couple of nodes and about 10 edges. When I try to load a large portion of the graph, using multiple partitions, I get inconsistent results in the number of edges between different runs. However, if I use a single partition, or a small portion of the CSV file (say 1000 rows), then I get a consistent number of edges. Is there anything I should be aware of as to why this could be happening in GraphX? Is it possible there's some nondeterminism in the way you're reading the file? It would be helpful if you could post the code you're using to load the graph. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX / PageRank with edge weights
At 2014-11-13 21:28:52 +, Ommen, Jurgen omme0...@stthomas.edu wrote: I'm using GraphX and playing around with its PageRank algorithm. However, I can't see from the documentation how to use edge weight when running PageRank. Is this possible to consider edge weights and how would I do it? There's no built-in way to prefer certain edges over others; edge weights are just set to the inverse of the outdegree of the source vertex. But it's simple to modify the PageRank code [1] to use custom edge weights instead: (1) copy the PageRank.run method body to your own project, (2) change the type signature of the input graph from Graph[VD, ED] to Graph[VD, Double], and (3) remove the calls to outerJoinVertices and mapTriplets on line 86 and 88. Ankur [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala#L79 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pagerank implementation
At 2014-11-15 18:01:22 -0700, tom85 tom.manha...@gmail.com wrote: This line: val newPR = oldPR + (1.0 - resetProb) * msgSum makes no sense to me. Should it not be: val newPR = resetProb/graph.vertices.count() + (1.0 - resetProb) * msgSum ? This is an unusual version of PageRank where the messages being passed around are deltas rather than full ranks. This occurs at line 156 in vertexProgram, which returns (newPR, newPR - oldPR). The second element of the tuple is the delta, which is subsequently used in sendMessage. The benefit of this is that sendMessage can avoid sending when the delta drops below the convergence threshold `tol`, indicating that the source vertex has converged. But it means that to update the rank of each vertex, we have to add the incoming delta to its existing rank. That's why the oldPR term appears in the line you're looking at. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
At 2014-11-17 14:47:50 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I was going through the graphx section in the Spark API in https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ShortestPaths$ Here, I find the word landmark. Can anyone explain to me what is landmark means. Is it a simple English word or does it mean something else in graphx. The landmarks in the context of the shortest-paths algorithm are just the vertices of interest. For each vertex in the graph, the algorithm will return the distance to each of the landmark vertices. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running PageRank in GraphX
At 2014-11-18 12:02:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I just ran the PageRank code in GraphX with some sample data. What I am seeing is that the total rank changes drastically if I change the number of iterations from 10 to 100. Why is that so? As far as I understand, the total rank should asymptotically approach the number of vertices in the graph, assuming there are no vertices of zero outdegree. Does that seem to be the case for your graph? Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 14:59:20 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: So landmark can contain just one vertex right? Right. Which algorithm has been used to compute the shortest path? It's distributed Bellman-Ford. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New Codes in GraphX
At 2014-11-18 14:51:54 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? If you want to add an algorithm which you can call from the Spark shell and submit as a pull request, you should add it to org.apache.spark.graphx.lib (#2). To run it from the command line, you'll also have to modify Analytics.scala. If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. It will also work to put it in either #1 or #2, but this will be worse in the long term because each build cycle will require you to rebuild and restart all of Spark rather than just building your application and calling spark-submit on the new JAR. Ankur [1] http://spark.apache.org/docs/1.0.2/quick-start.html#standalone-applications - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 15:29:08 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Does Bellman-Ford give the best solution? It gives the same solution as any other algorithm, since there's only one correct solution for shortest paths and it's guaranteed to find it eventually. There are probably faster distributed algorithms for single-source shortest paths, but I'm not familiar with them. As far as I can tell, distributed Bellman-Ford is the most widely-used one. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New Codes in GraphX
At 2014-11-18 15:35:13 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Now, how do I run the LiveJournalPageRank.scala that is there in 1? I think it should work to use MASTER=local[*] $SPARK_HOME/bin/run-example graphx.LiveJournalPageRank /edge-list-file.txt --numEPart=8 --numIter=10 --partStrategy=EdgePartition2D Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 15:44:31 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I meant to ask whether it gives the solution faster than other algorithms. No, it's just that it's much simpler and easier to implement than the others. Section 5.2 of the Pregel paper [1] justifies using it for a graph (a binary tree) with 1 billion vertices on 300 machines: More advanced parallel algorithms exist, e.g., Thorup [44] or the ∆-stepping method [37], and have been used as the basis for special-purpose parallel shortest paths implementations [12, 32]. Such advanced algorithms can also be expressed in the Pregel framework. The simplicity of the implementation in Figure 5, however, together with the already acceptable performance (see Section 6), may appeal to users who can't do extensive tuning or customization. What do you mean by distributed algorithms? Can we not use any algorithm on a distributed environment? Any algorithm can be split up and run in a distributed environment, but because inter-node coordination is expensive, that can be very inefficient. Distributed algorithms in this context are ones that reduce coordination. Ankur [1] http://db.cs.berkeley.edu/cs286/papers/pregel-sigmod2010.pdf - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New Codes in GraphX
At 2014-11-18 15:51:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes the above command works, but there is this problem. Most of the times, the total rank is Nan (Not a Number). Why is it so? I've also seen this, but I'm not sure why it happens. If you could find out which vertices are getting the NaN rank, it might be helpful in tracking down the problem. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fwd: Executor Lost Failure
At 2014-11-10 22:53:49 +0530, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Tasks are now getting submitted, but many tasks don't happen. Like, after opening the spark-shell, I load a text file from disk and try printing its contentsas: sc.textFile(/path/to/file).foreach(println) It does not give me any output. That's because foreach launches tasks on the slaves. When each task tries to print its lines, they go to the stdout file on the slave rather than to your console at the driver. You should see the file's contents in each of the slaves' stdout files in the web UI. This only happens when running on a cluster. In local mode, all the tasks are running locally and can output to the driver, so foreach(println) is more useful. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: To find distances to reachable source vertices using GraphX
The NullPointerException seems to be because edge.dstAttr is null, which might be due to SPARK-3936 https://issues.apache.org/jira/browse/SPARK-3936. Until that's fixed, I edited the Gist with a workaround. Does that fix the problem? Ankur http://www.ankurdave.com/ On Mon, Nov 3, 2014 at 12:23 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi All, I'm trying to understand below link example program. When I run this program, I'm getting *java.lang.NullPointerException* at below highlighted line. *https://gist.github.com/ankurdave/4a17596669b36be06100 https://gist.github.com/ankurdave/4a17596669b36be06100* val updatedDists = edge.srcAttr.filter { case (source, dist) = *val existingDist = edge.dstAttr.getOrElse(source, Int.MaxValue) * existingDist dist + 1 }.mapValues(_ + 1).map(identity) Could you please help me to resolve this issue. Regards, Rajesh
Re: How to correctly extimate the number of partition of a graph in GraphX
How large is your graph, and how much memory does your cluster have? We don't have a good way to determine the *optimal* number of partitions aside from trial and error, but to get the job to at least run to completion, it might help to use the MEMORY_AND_DISK storage level and a large number of partitions. Ankur http://www.ankurdave.com/ On Sat, Nov 1, 2014 at 10:57 PM, James alcaid1...@gmail.com wrote: Hello, I am trying to run Connected Component algorithm on a very big graph. In practice I found that a small number of partition size would lead to OOM, while a large number would cause various time out exceptions. Thus I wonder how to estimate the number of partition of a graph in GraphX? Alcaid
Re: How to set persistence level of graph in GraphX in spark 1.0.0
At 2014-10-25 08:56:34 +0530, Arpit Kumar arp8...@gmail.com wrote: GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] Here's a workaround: 1. Copy and modify the GraphLoader source as you did, but keep it in the org.apache.spark.graphx.impl package to fix the package-private error. 2. In addition to changing the persistence level of the edges RDD in GraphLoader, construct the VertexRDD and EdgeRDD yourself. 3. Call GraphImpl.fromExistingRDDs to construct the graph. This function will respect the existing EdgeRDD storage level. 4. Use the graph as desired. Be sure to avoid Graph#partitionBy, the Pregel API, and all of the built-in algorithms, because they call Graph#cache() on intermediate graphs. Here is a modified version of GraphLoader that does 1-3: https://gist.github.com/ankurdave/0394d47809297eea76ff Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX StackOverflowError
At 2014-10-28 16:27:20 +0300, Zuhair Khayyat zuhair.khay...@gmail.com wrote: I am using connected components function of GraphX (on Spark 1.0.2) on some graph. However for some reason the fails with StackOverflowError. The graph is not too big; it contains 1 vertices and 50 edges. [...] 14/10/28 16:08:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 13 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[13] at mapPartitions at VertexRDD.scala:423) This seems like a bug in Scala's implementation of quicksort, maybe SI-7837 [1]. GraphX uses Scala's quicksort to sort the edges when loading them into memory. If you're able to modify Spark, you could avoid using quicksort by changing EdgePartitionBuilder.scala:40 from Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) to implicit val ordering: Ordering[Edge[ED]] = Edge.lexicographicOrdering Sorting.stableSort(edgeArray) Otherwise, the workaround will depend on the cause of the bug. If it's because you have thousands of duplicates of the same edge which are triggering SI-7837, you could use RDD#distinct to remove them before constructing the graph. If it's because of an unlikely worst-case data distribution that's causing quicksort to run in linear time, you could reorder the edges using RDD#sortBy. Ankur [1] https://issues.scala-lang.org/browse/SI-7837 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Workaround for SPARK-1931 not compiling
At 2014-10-23 09:48:55 +0530, Arpit Kumar arp8...@gmail.com wrote: error: value partitionBy is not a member of org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID, org.apache.spark.graphx.Edge[ED])] Since partitionBy is a member of PairRDDFunctions, it sounds like the implicit conversion from RDD to PairRDDFunctions is not getting applied. Does it help to import org.apache.spark.SparkContext._ before applying the workaround? Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: graphx - mutable?
On Tue, Oct 14, 2014 at 12:36 PM, ll duy.huynh@gmail.com wrote: hi again. just want to check in again to see if anyone could advise on how to implement a mutable, growing graph with graphx? we're building a graph is growing over time. it adds more vertices and edges every iteration of our algorithm. it doesn't look like there is an obvious way to add a new vertice a set of edges to an existing graph. Currently the only way to do this is to rebuild the graph in each iteration by adding more vertices and edges to the source RDDs, then calling the graph constructor. I'm working on a way to support this more efficiently (SPARK-2365 https://issues.apache.org/jira/browse/SPARK-2365), but GraphX doesn't take advantage of this yet. Ankur http://www.ankurdave.com/
Re: graphx - mutable?
On Tue, Oct 14, 2014 at 1:57 PM, Duy Huynh duy.huynh@gmail.com wrote: a related question, what is the best way to update the values of existing vertices and edges? Many of the Graph methods deal with updating the existing values in bulk, including mapVertices, mapEdges, mapTriplets, mapReduceTriplets, and outerJoinVertices. To update just a small number of existing values, IndexedRDD would be ideal, but until it makes it into GraphX the best way is to use one of the above methods. This will be slower since it's touching all of the vertices, but it will achieve the same goal. For example, if you had a graph and wanted to update the value of vertex 1 to a, you could do the following: val graph = ... val updates = sc.parallelize(List((1L, a))) val newGraph = graph.outerJoinVertices(updates) { (id, a, b) = b } Ankur http://www.ankurdave.com/
Re: How to construct graph in graphx
At 2014-10-13 18:22:44 -0400, Soumitra Siddharth Johri soumitra.siddha...@gmail.com wrote: I have a flat tab separated file like below: [...] where n1,n2,n3,n4 are the nodes of the graph and R1,P2,P3 are the properties which should form the edges between the nodes. How can I construct a graph from the above file in SPARK GraphX ? Example code would be very helpful. Here's how to do this: https://gist.github.com/ankurdave/587eac4d08655d0eebf9 Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to construct graph in graphx
At 2014-10-13 21:08:15 -0400, Soumitra Johri soumitra.siddha...@gmail.com wrote: There is no 'long' field in my file. So when I form the edge I get a type mismatch error. Is it mandatory for GraphX that every vertex should have a distinct id. ? in my case n1,n2,n3,n4 are all strings. (+user so others can see the solution) Yes, GraphX currently only supports integer vertex ids. If your data is stored with string ids, a possible workaround is to hash them to integers using String.hashCode. If you need access to the string representation later, you can store it in the vertex attribute. I added a file to the Gist that does this: https://gist.github.com/ankurdave/587eac4d08655d0eebf9#file-load-edges-with-properties-and-string-id-scala Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pregel messages serialized in local machine?
At 2014-09-25 06:52:46 -0700, Cheuk Lam chl...@hotmail.com wrote: This is a question on using the Pregel function in GraphX. Does a message get serialized and then de-serialized in the scenario where both the source and the destination vertices are in the same compute node/machine? Yes, message passing currently uses partitionBy, which shuffles all messages, including ones to the same machine, by serializing them and writing them to disk. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to group within the messages at a vertex?
At 2014-09-17 11:39:19 -0700, spr s...@yarcdata.com wrote: I'm trying to implement label propagation in GraphX. The core step of that algorithm is - for each vertex, find the most frequent label among its neighbors and set its label to that. [...] It seems on the broken line above, I don't want to reduce all the values to a scalar, as this code does, but rather group them first and then reduce them. Can I do that all within mapReduceTriples? If not, how do I build something that I can then further reduce? Label propagation is actually already implemented in GraphX [1]. The way it handles the most frequent label reduce operation is to aggregate a histogram, implemented as a map from label to frequency, and then take the most frequent element from the map at the end. Something to watch out for is that this can create large aggregation messages for high-degree vertices. Ankur [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: vertex active/inactive feature in Pregel API ?
At 2014-09-16 10:55:37 +0200, Yifan LI iamyifa...@gmail.com wrote: - from [1], and my understanding, the existing inactive feature in graphx pregel api is “if there is no in-edges, from active vertex, to this vertex, then we will say this one is inactive”, right? Well, that's true when messages are only sent forward along edges (from the source to the destination) and the activeDirection is EdgeDirection.Out. If both of these conditions are true, then a vertex without in-edges cannot receive a message, and therefore its vertex program will never run and a message will never be sent along its out-edges. PageRank is an application that satisfies both the conditions. For instance, there is a graph in which every vertex has at least one in-edges, then we run static Pagerank on it for 10 iterations. During this calculation, is there any vertex would be set inactive? No: since every vertex always sends a message in static PageRank, if every vertex has an in-edge, it will always receive a message and will remain active. In fact, this is why I recently rewrote static PageRank not to use Pregel [3]. Assuming that most vertices do have in-edges, it's unnecessary to track active vertices, which can provide a big savings. - for more “explicit active vertex tracking”, e.g. vote to halt, how to achieve it in existing api? (I am not sure I got the point of [2], that “vote” function has already been introduced in graphx pregel api? ) The current Pregel API effectively makes every vertex vote to halt in every superstep. Therefore only vertices that receive messages get awoken in the next superstep. Instead, [2] proposes to make every vertex run by default in every superstep unless it votes to halt *and* receives no messages. This allows a vertex to have more control over whether or not it will run, rather than leaving that entirely up to its neighbors. Ankur [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$ [2] https://github.com/apache/spark/pull/1217 [3] https://github.com/apache/spark/pull/2308 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: vertex active/inactive feature in Pregel API ?
At 2014-09-16 12:23:10 +0200, Yifan LI iamyifa...@gmail.com wrote: but I am wondering if there is a message(none?) sent to the target vertex(the rank change is less than tolerance) in below dynamic page rank implementation, def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { if (edge.srcAttr._2 tol) { Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) } else { Iterator.empty } } so, in this case, there is a message, even is none, is still sent? or not? No, in that case no message is sent, and if all in-edges of a particular vertex return Iterator.empty, then the vertex will become inactive in the next iteration. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: vertex active/inactive feature in Pregel API ?
At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote: I am wondering if the vertex active/inactive(corresponding the change of its value between two supersteps) feature is introduced in Pregel API of GraphX? Vertex activeness in Pregel is controlled by messages: if a vertex did not receive a message in the previous iteration, its vertex program will not run in the current iteration. Also, inactive vertices will not be able to send messages because by default the sendMsg function will only be run on edges where at least one of the adjacent vertices received a message. You can change this behavior -- see the documentation for the activeDirection parameter to Pregel.apply [1]. There is also an open pull request to make active vertex tracking more explicit by allowing vertices to vote to halt directly [2]. Ankur [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$ [2] https://github.com/apache/spark/pull/1217 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
At 2014-09-05 12:13:18 +0200, Yifan LI iamyifa...@gmail.com wrote: But how to assign the storage level to a new vertices RDD that mapped from an existing vertices RDD, e.g. *val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId, a:Array[VertexId]) = (id, initialHashMap(a))}* the new one will be combined with that existing edges RDD(MEMORY_AND_DISK) to construct a new graph. e.g. val newGraph = Graph(newVertexRDD, graph.edges) Sorry for the late reply. If you are constructing a graph from the derived VertexRDD, you can pass a desired storage level to the Graph constructor: val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map { case (id: VertexId, a: Array[VertexId]) = (id, initialHashMap(a)) } val newGraph = Graph( newVertexRDD, graph.edges, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) For others reading, the reason why GraphX needs to be told the desired storage level is that it internally constructs temporary vertex or edge RDDs and uses them more than once, so it has to cache them to avoid recomputation. BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true, false, true, 1), what does it mean? See the StorageLevel object [1]. This particular storage level corresponds to StorageLevel.MEMORY_AND_DISK. Ankur [1] https://github.com/apache/spark/blob/092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala#L147 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1.0 requested array size exceed vm limits
At 2014-09-05 21:40:51 +0800, marylucy qaz163wsx_...@hotmail.com wrote: But running graphx edgeFileList ,some tasks failed error:requested array size exceed vm limits Try passing a higher value for minEdgePartitions when calling GraphLoader.edgeListFile. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level You have to pass the StorageLevel to GraphLoader.edgeListFile: val graph = GraphLoader.edgeListFile( sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) .partitionBy(PartitionStrategy.EdgePartition2D) Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)
At 2014-08-26 01:20:09 -0700, BertrandR bertrand.rondepierre...@gmail.com wrote: I actually tried without unpersisting, but given the performance I tryed to add these in order to free the memory. After your anwser I tried to remove them again, but without any change in the execution time... This is probably a related issue: in Spark you have to explicitly cache any dataset that you use more than once. Otherwise it will be recomputed each time it's used, which can cause an exponential slowdown for certain dependency structures. To be safe, you could start by caching g, msg, and newVerts every time they are set. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)
At 2014-08-25 06:41:36 -0700, BertrandR bertrand.rondepierre...@gmail.com wrote: Unfortunately, this works well for extremely small graphs, but it becomes exponentially slow with the size of the graph and the number of iterations (doesn't finish 20 iterations with graphs having 48000 edges). [...] It seems to me that a lot of things are unnecessarily recomputed at each iterations whatever I try to do. I also did multiple changes to limit the number of dependency of each object, but it didn't change anything. [...] fusionBcst.unpersist(blocking = false) The problem is almost certainly because of unpersisting. If you comment out all the unpersist lines, the program should run normally. Unpersisting is very tricky because of the internal dependency structure of graphs: they maintain a vertex and an edge RDD, and each depends on both from the previous iteration. A future update to GraphX will unify them so that a graph only has one RDD, and this will make it easier to unpersist correctly. Until then, unpersisting may not be worth the trouble. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX usecases
At 2014-08-25 11:23:37 -0700, Sunita Arvind sunitarv...@gmail.com wrote: Does this We introduce GraphX, which combines the advantages of both data-parallel and graph-parallel systems by efficiently expressing graph computation within the Spark data-parallel framework. We leverage new ideas in distributed graph representation to efficiently distribute graphs as tabular data-structures. Similarly, we leverage advances in data-flow systems to exploit in-memory computation and fault-tolerance. mean that GraphX makes the typical RDBMS operations possible even when the data is persisted in a GDBMS and not viceversa? This quote refers to the research idea that while previous graph-parallel systems (Pregel, GraphLab, etc.) were built as specialized systems for performance, it's actually possible to avoid the trouble of a separate system by embedding graph computation efficiently in a general data-parallel system. Here data-parallel refers generally to any system that can support the join optimizations, including Spark and, with some work on the optimizer, relational databases as well. So GraphX use data-parallel or relational operators to provide graph computation, not the other way around. From what I initially thought, it looked like GraphX could be applied to data stored in RDBMSs as Spark could translate the relational data into graphical representation. However, there seems to be no conversation and everything presented in GraphX implementations AFAIK, works on vertices and edges. So does it mean that GraphX is only relevant when the backend is a GDBMS? GraphX, the library on top of Spark, can be applied indirectly to relational data as you described: you can use Spark to load vertex and edge tables from a relational database, then process them with GraphX. This isn't discussed in the GraphX documentation because it's a concern of Spark. GraphX is only relevant once you have the vertices and edges in RDD form. GraphX, the research concept, can in theory be implemented directly in a relational database by augmenting the query optimizer to support the optimizations described in the paper and setting up the appropriate indexes on the vertex and edge tables. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The running time of spark
At 2014-08-23 08:33:48 -0700, Denis RP qq378789...@gmail.com wrote: Bottleneck seems to be I/O, the CPU usage ranges 10%~15% most time per VM. The caching is maintained by pregel, should be reliable. Storage level is MEMORY_AND_DISK_SER. I'd suggest trying the DISK_ONLY storage level and possibly increasing the number of partitions. I did a local test with a 2G heap, 1M vertices, 126M edges, and 100 edge partitions, and MEMORY_AND_DISK_SER failed with OutOfMemoryErrors while DISK_ONLY succeeded. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Personalized Page rank in graphx
At 2014-08-20 10:57:57 -0700, Mohit Singh mohit1...@gmail.com wrote: I was wondering if Personalized Page Rank algorithm is implemented in graphx. If the talks and presentation were to be believed (https://amplab.cs.berkeley.edu/wp-content/uploads/2014/02/graphx@strata2014_final.pdf) it is.. but cant find the algo code (https://github.com/amplab/graphx/tree/master/graphx/src/main/scala/org/apache/spark/graphx/lib)? Those slides only mean Personalized PageRank can be expressed within the neighborhood-centric model. I don't think anyone has implemented it for GraphX yet. If you're interested in doing that, we'd be glad to add it to the library! Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX question about graph traversal
At 2014-08-20 10:34:50 -0700, Cesar Arevalo ce...@zephyrhealthinc.com wrote: I would like to get the type B vertices that are connected through type A vertices where the edges have a score greater than 5. So, from the example above I would like to get V1 and V4. It sounds like you're trying to find paths in the graph that match a particular pattern. I wrote a prototype for doing that using the Pregel API [1, 2] in response to an earlier question on the mailing list [3]. That code won't solve your problem immediately, since it requires exact vertex and edge attribute matches rather than predicates like greater than 5, but it should be easy to modify it appropriately. Ankur [1] https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala [2] https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala [3] http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-traversal-and-merge-interesting-edges-td8788.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: noob: how to extract different members of a VertexRDD
(+user) On Tue, Aug 19, 2014 at 12:05 PM, spr s...@yarcdata.com wrote: I want to assign each vertex to a community with the name of the vertex. As I understand it, you want to set the vertex attributes of a graph to the corresponding vertex ids. You can do this using Graph#mapVertices [1] as follows: val g = ... val newG = g.mapVertices((id, attr) = id) // newG.vertices has type VertexRDD[VertexId], or RDD[(VertexId, VertexId)] If you only want to do this for a VertexRDD without constructing a new graph using the modified vertices, you can use VertexRDD#mapVertices [2] in a similar fashion. [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD) ⇒VD2)(ClassTag[VD2]):Graph[VD2,ED] [2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.VertexRDD@mapValues[VD2]((VertexId,VD) ⇒VD2)(ClassTag[VD2]):VertexRDD[VD2] Ankur http://www.ankurdave.com/
Re: noob: how to extract different members of a VertexRDD
At 2014-08-19 12:47:16 -0700, spr s...@yarcdata.com wrote: One follow-up question. If I just wanted to get those values into a vanilla variable (not a VertexRDD or Graph or ...) so I could easily look at them in the REPL, what would I do? Are the aggregate data structures inside the VertexRDD/Graph/... Arrays or Lists or what, or do I even need to know/care? The vertex values are internally stored in hash tables within each partition (see VertexPartitionBase if you're curious) but to access them all from the REPL, you can just use RDD#collect as in your first mail. If you want just the vertex ids, you can use RDD#map first: val verts: VertexRDD[Int] = ... val pairs: Array[(VertexId, Int)] = verts.collect() val ids: Array[VertexId] = verts.map(kv = kv._1).collect() Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote: I am testing our application(similar to personalised page rank using Pregel, and note that each vertex property will need pretty much more space to store after new iteration) [...] But when we ran it on larger graph(e.g. LiveJouranl), it always end at the error GC overhead limit exceeded, even the partitions number is increased to 48 from 8. If the graph (including vertex properties) is too large to fit in memory, you might try allowing it to spill to disk. When constructing the graph, you can set vertexStorageLevel and edgeStorageLevel to StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish. Ankur http://www.ankurdave.com/
Re: GraphX Pagerank application
On Wed, Aug 6, 2014 at 11:37 AM, AlexanderRiggers alexander.rigg...@gmail.com wrote: To perform the page rank I have to create a graph object, adding the edges by setting sourceID=id and distID=brand. In GraphLab there is function: g = SGraph().add_edges(data, src_field='id', dst_field='brand') Is there something similar in GraphX? It sounds like you're trying to parse an edge list file into a graph, where each line is a comma-separated pair of numeric vertex ids. There's a built-in parser for tab-separated pairs (see GraphLoader) and it should be easy to adapt that to comma-separated pairs. You can also drop the header line using RDD#filter (and eventually using https://github.com/apache/spark/pull/1839). Ankur http://www.ankurdave.com/
Re: [GraphX] How spark parameters relate to Pregel implementation
At 2014-08-04 20:52:26 +0800, Bin wubin_phi...@126.com wrote: I wonder how spark parameters, e.g., number of paralellism, affect Pregel performance? Specifically, sendmessage, mergemessage, and vertexprogram? I have tried label propagation on a 300,000 edges graph, and I found that no paralellism is much faster than 5 or 500 paralellism. Increasing the level of parallelism will increase storage overhead (because each vertex will need to be replicated to more edge partitions to form the triplets) and will also increase communication. Unless there's something to be gained from higher parallelism, this will worsen performance. Additionally, going from no parallelism to some parallelism will incur the extra cost of task communication via shuffles. Parallelism has two benefits: it allows edge scans and aggregations to proceed in parallel, and it enables the graph to be stored across many machines. For small graphs, the slight performance gain due to parallelism is vastly outweighed by the cost of inter-process communication and shuffling to disk, and distributed storage is not necessary since the graph fits on a single machine. There are single-machine graph processing systems such as X-Stream [1] and GraphChi [2] that optimize performance for these kinds of graphs. However, parallelism becomes necessary for larger graphs with hundreds of millions of edges or large amounts of associated vertex and edge data. GraphX is designed for this scale of data. Ankur [1] http://infoscience.epfl.ch/record/188535/files/paper.pdf [2] http://graphlab.org/projects/graphchi.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX
At 2014-08-02 21:29:33 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: How should I run graphx codes? At the moment it's a little more complicated to run the GraphX algorithms than the Spark examples due to SPARK-1986 [1]. There is a driver program in org.apache.spark.graphx.lib.Analytics which you can invoke using spark-submit: $SPARK_HOME/bin/spark-submit --master local[*] --class org.apache.spark.graphx.lib.Analytics \ $SPARK_HOME/assembly/target/scala-2.10/spark-assembly-*.jar \ pagerank /edge-list-file.txt --numEPart=8 --numIter=10 --partStrategy=EdgePartition2D This supports running PageRank, connected components, and triangle count. For the other algorithms, you can use the Spark shell: import org.apache.spark.graphx._ val graph = (GraphLoader.edgeListFile(sc, /edge-list-file.txt, minEdgePartitions = 8) .partitionBy(PartitionStrategy.EdgePartition2D)) // Run algorithms on graph Ankur [1] https://issues.apache.org/jira/browse/SPARK-1986 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] how to compute only a subset of vertices in the whole graph?
At 2014-08-02 19:04:22 +0200, Yifan LI iamyifa...@gmail.com wrote: But I am thinking of if I can compute only some selected vertexes(hubs), not to do update on every vertex… is it possible to do this using Pregel API? The Pregel API already only runs vprog on vertices that received messages in the previous iteration. Is there some other selection criteria that you'd like to express? Depending on the criteria, the easiest thing might be to move off the Pregel API and write against the lower-level GraphX API. I did this for PageRank recently [1]. or, more realistically, only hubs can receive messages and compute results in the LAST iteration? since we don't need the final result of non-hub vertices. For this it might be easiest to duplicate the Pregel API implementation and special-case the last iteration. Ankur [1] https://github.com/ankurdave/spark/blob/3231a8efadbb45227dea63011c1a8dd856595475/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L274 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] The best way to construct a graph
At 2014-08-01 11:23:49 +0800, Bin wubin_phi...@126.com wrote: I am wondering what is the best way to construct a graph? Say I have some attributes for each user, and specific weight for each user pair. The way I am currently doing is first read user information and edge triple into two arrays, then use sc.parallelize to create vertexRDD and edgeRDD, respectively. Then create the graph using Graph(vertices, edges). I wonder whether there is a better way to do this? That's a perfectly fine way to construct a graph. Are you encountering a problem with it? The only suggestion I would make is to load the data using sc.textFile rather than reading into an array and calling sc.parallelize. This will avoid loading it all into the driver's memory. GraphLoader does have the slight advantage that it avoids allocating a pair per vertex, but this is unlikely to be a big cost, so it's fine to use Graph(vertices, edges) if GraphLoader isn't suitable. Ankur
Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
Attempting to build Spark from source on EC2 using sbt gives the error sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found. This only seems to happen on EC2, not on my local machine. To reproduce, launch a cluster using spark-ec2, clone the Spark repository, and run sbt/sbt assembly. A complete transcript is at https://gist.github.com/ankurdave/bb96ea237700f5cd670c. Here is an excerpt with the error: [info] Resolving org.scala-lang#scala-library;2.10.2 ... [warn] module not found: org.scala-lang#scala-library;2.10.2 [...] [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.scala-lang#scala-library;2.10.2: not found [warn] :: sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:217) [...] [error] (*:update) sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? After a bisection, it seems the problem was introduced by the SBT-Maven change (628932b) [1, 2]. Ankur [1] https://github.com/apache/spark/pull/772 [2] https://issues.apache.org/jira/browse/SPARK-1776
Re: creating a distributed index
At 2014-08-01 14:50:22 -0600, Philip Ogren philip.og...@oracle.com wrote: It seems that I could do this with mapPartition so that each element in a partition gets added to an index for that partition. [...] Would it then be possible to take a string and query each partition's index with it? Or better yet, take a batch of strings and query each string in the batch against each partition's index? I proposed a key-value store based on RDDs called IndexedRDD that does exactly what you described. It uses mapPartitions to construct an index within each partition, then exposes get and multiget methods to allow looking up values associated with given keys. It will hopefully make it into Spark 1.2.0. Until then you can try it out by merging in the pull request locally: https://github.com/apache/spark/pull/1297. See JIRA for details and slides on how it works: https://issues.apache.org/jira/browse/SPARK-2365. Ankur
Re: Graphx : Perfomance comparison over cluster
ShreyanshB shreyanshpbh...@gmail.com writes: The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. It'd be great if you can tell me how to configure and invoke this spark version. Sorry for the delay on this. Assuming you're planning to launch an EC2 cluster, here's how to use the version of GraphX with in-memory shuffle: 1. Check out the in-memory shuffle branch locally. It's important to do this before launching the cluster to make sure the cluster gets set up in a way that's compatible with this version of Spark (using the v2 branch of https://github.com/mesos/spark-ec2). git clone https://github.com/amplab/graphx2 -b vldb mv graphx2 spark 2. Launch a cluster. cd spark ec2/spark-ec2 -s 16 -w 500 -k ec2-key-name -i path/to/ec2-key.pem -t m2.4xlarge -z us-east-1e --spot-price=1 launch graphx-benchmarking 3. On the cluster, check out and build the in-memory shuffle branch. cd /mnt git clone https://github.com/amplab/graphx2 -b vldb mv graphx2 spark cd spark mkdir -p conf cp ~/spark/conf/* conf/ sbt/sbt assembly rsync -r --delete . ~/spark ~/spark/sbin/stop-all.sh ~/spark-ec2/copy-dir --delete ~/spark ~/spark/sbin/start-all.sh 3. Load your input graph onto HDFS in edge list format. ~/ephemeral-hdfs/bin/hadoop fs -put edge-list.txt / 4. Run PageRank using the Analytics driver. cd ~/spark MASTER=spark://$(wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname):7077 /usr/bin/time -f TOTAL TIME: %e seconds ~/spark/bin/spark-class org.apache.spark.graphx.lib.Analytics $MASTER pagerank /edge-list.txt --numEPart=128 --numIter=10 Ankur
Re: [GraphX] How to access a vertex via vertexId?
Yifan LI iamyifa...@gmail.com writes: Maybe you could get the vertex, for instance, which id is 80, by using: graph.vertices.filter{case(id, _) = id==80}.collect but I am not sure this is the exactly efficient way.(it will scan the whole table? if it can not get benefit from index of VertexRDD table) Until IndexedRDD is merged, a scan and collect is the best officially supported way. PairRDDFunctions.lookup does this under the hood as well. However, it's possible to use the VertexRDD's hash index to do a much more efficient lookup. Note that these APIs may change, since VertexPartitionBase and its subclasses are private[graphx]. You can access the partitions of a VertexRDD using VertexRDD#partitionsRDD, and each partition has VertexPartitionBase#isDefined and VertexPartitionBase#apply. Putting it all together: val verts: VertexRDD[_] = ... val targetVid: VertexId = 80L val result = verts.partitionsRDD.flatMap { part = if (part.isDefined(targetVid)) Some(part(targetVid)) else None }.collect.head Once IndexedRDD [1] is merged, it will provide this functionality using verts.get(targetVid). Its implementation of get also uses the hash partitioner to run only one task [2]. Ankur [1] https://issues.apache.org/jira/browse/SPARK-2365 [2] https://github.com/ankurdave/spark/blob/IndexedRDD/core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala#L89
Re: the pregel operator of graphx throws NullPointerException
Denis RP qq378789...@gmail.com writes: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:4 failed 4 times, most recent failure: Exception failure in TID 598 on host worker6.local: java.lang.NullPointerException [error] scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [error] scala.collection.Iterator$class.foreach(Iterator.scala:727) [error] scala.collection.AbstractIterator.foreach(Iterator.scala:1157) It looks like Iterator.scala:328 [1] is Iterator#map, and it's likely failing because the map function is null. I haven't seen this before, but I wonder if SPARK-2292 [2] is related. The stack trace is different there, but the problem of a function being null is the same. Based on the JIRA comments, it might be a problem with your build and launch process. How are you deploying your application? Ankur [1] https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/Iterator.scala#L328 [2] https://issues.apache.org/jira/browse/SPARK-2292
Re: VertexPartition and ShippableVertexPartition
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao xia...@sjtu.edu.cn wrote: On 7/28/14, 3:41 PM, shijiaxin wrote: There is a VertexPartition in the EdgePartition,which is created by EdgePartitionBuilder.toEdgePartition. and There is also a ShippableVertexPartition in the VertexRDD. These two Partitions have a lot of common things like index, data and Bitset, why is this necessary? There is a VertexPartition in the EdgePartition,which is created by Is the VertexPartition in the EdgePartition, the Mirror Cache part? Yes, exactly. The primary copy of each vertex is stored in the VertexRDD using the index, values, and mask data structures, which together form a hash map. In addition, each partition of the VertexRDD stores the corresponding partition of the routing table to facilitate joining with the edges. The ShippableVertexPartition class encapsulates the vertex hash map along with a RoutingTablePartition. After joining the vertices with the edges, the edge partitions cache their adjacent vertices in the mirror cache. They use the VertexPartition for this, which provides only the hash map functionality and not the routing table. Ankur http://www.ankurdave.com/
Re: Spark streaming vs. spark usage
On Mon, Jul 28, 2014 at 12:53 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: But when done processing, one would still have to pull out the wrapped object, knowing what it was, and I don't see how to do that. It's pretty tricky to get the level of type safety you're looking for. I know of two ways: 1. Leave RDD and DStream as they are, but define a typeclass http://danielwestheide.com/blog/2013/02/06/the-neophytes-guide-to-scala-part-12-type-classes.html that allows converting them to a common DistributedCollection type. Example https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-typeclass-scala . 2. Make RDD and DStream inherit from a common DistributedCollection trait, as in your example, but use F-bounded polymorphism https://twitter.github.io/scala_school/advanced-types.html#fbounded to express the concrete types. Example https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-fbounded-polymorphism-scala . Ankur http://www.ankurdave.com/
Re: GraphX Pragel implementation
On Thu, Jul 24, 2014 at 9:52 AM, Arun Kumar toga...@gmail.com wrote: While using pregel API for Iterations how to figure out which super step the iteration currently in. The Pregel API doesn't currently expose this, but it's very straightforward to modify Pregel.scala https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L112 to do so. Let me know if you'd like help doing this. Ankur http://www.ankurdave.com/
Re: Where is the PowerGraph abstraction
We removed https://github.com/apache/spark/commit/732333d78e46ee23025d81ca9fbe6d1e13e9f253 the PowerGraph abstraction layer when merging GraphX into Spark to reduce the maintenance costs. You can still read the code https://github.com/apache/spark/blob/feaa07802203b79f454454445c0a12a2784ccfeb/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala from the repository history, but it would be best to use the Pregel API if possible. Ankur http://www.ankurdave.com/
Re: the implications of some items in webUI
On Tue, Jul 22, 2014 at 7:08 AM, Yifan LI iamyifa...@gmail.com wrote: 1) what is the difference between Duration(Stages - Completed Stages) and Task Time(Executors) ? Stages are composed of tasks that run on executors. Tasks within a stage may run concurrently, since there are multiple executors and each executor may run more than one task at a time. An executor's task time is the sum of the durations of all of its tasks. Because this is a simple sum, it does not take parallelism into account: if an executor runs 8 tasks concurrently and each takes a minute, it has only spent one minute of wallclock time, but the reported task time will be 8 minutes. A stage's duration is how much wallclock time elapsed between when the first task launched and when the last task finished. This does take parallelism into account, so in the above example the stage duration would be 1 minute. 2) what are the exact meanings of Shuffle Read/Shuffle Write? Stages communicate using shuffles. Each task may start by reading shuffle inputs across the network, and may end by writing shuffle outputs to disk locally. See page 7 of the Spark NSDI paper https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf for details. Shuffle read and shuffle write refer to the total amount of data that a stage read across the network and wrote to disk. Ankur http://www.ankurdave.com/
Re: Question about initial message in graphx
On Mon, Jul 21, 2014 at 8:05 PM, Bin WU bw...@connect.ust.hk wrote: I am not sure how to specify different initial values to each node in the graph. Moreover, I am wondering why initial message is necessary. I think we can instead initialize the graph and then pass it into Pregel interface? Indeed it's not necessary, and a future update to Pregel will probably remove it: https://github.com/apache/spark/pull/1217 But the initial message parameter doesn't prevent you from specifying different initial *values* for each vertex. Pregel respects the initial vertex values of the provided graph; the initial message just ensures that it will run vprog at least once per vertex. If you don't want an initial message, one option is to set aside a special message value for it and check for this in vprog. For example, if the message type is Int, you could change it to Option[Int] and use None as the initial message. Ankur http://www.ankurdave.com/
Re: Graphx : Perfomance comparison over cluster
On Fri, Jul 18, 2014 at 9:07 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: Does the suggested version with in-memory shuffle affects performance too much? We've observed a 2-3x speedup from it, at least on larger graphs like twitter-2010 http://law.di.unimi.it/webdata/twitter-2010/ and uk-2007-05 http://law.di.unimi.it/webdata/uk-2007-05/. (according to previously reported numbers, graphx did 10 iterations in 142 seconds and in latest stats it does it in 68 seconds). Is it just the in-memory version which is changed? If you're referring to previous results vs. the arXiv paper, there were several improvements, but in-memory shuffle had the largest impact. Ankur http://www.ankurdave.com/
Re: the default GraphX graph-partition strategy on multicore machine?
On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: Yes, is possible to defining a custom partition strategy? Yes, you just need to create a subclass of PartitionStrategy as follows: import org.apache.spark.graphx._ object MyPartitionStrategy extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { // put your hash function here } } if I load one edges file(5 GB), without any cores/partitions setting, what is the default partition in graph construction? and how many cores will be used? Or, if the size of file is 50 GB(more than available memory, without partition setting)? If you don't specify a number of partitions, it will default to 2 or the number of blocks in the input file, whichever is greater (this is the same behavior as sc.textFile https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1243). I'm not sure exactly how many this will be for your file, but you can find out by running sc.textFile(...).partitions.length. The default partitioning strategy is to leave the edges in the same partitions as they were initially loaded from. For the 50 GB file, everything is the same except the default number of partitions will probably be larger. Hopefully each partition will individually still fit in memory, which will allow GraphX to proceed. the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Yes, for that graph and partitioning, all vertices will be replicated to all edge partitions. It won't be quite as bad for most graphs, and EdgePartition2D http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$$EdgePartition2D$ provides a bound on the replication factor, as described in the docs. You mean that the core is stricter to access only its own partition in memory? how do they communicate when the required data(edges?) in another partition? Right, there is one task per core, and each task only accesses its own partition, never the partitions of other tasks. Tasks communicate in two ways: 1. Vertex replication (multicast). When an edge needs to access its neighboring vertices (to construct the triplets or perform the map step in mapReduceTriplets), GraphX replicates vertices to all edge partitions where they are referenced. 2. Vertex aggregation. When an edge needs to update the value of a neighboring vertex (to perform the reduce step in mapReduceTriplets), GraphX aggregates vertex updates from the edge partitions back to the vertices. Both of these communication steps happen using Spark shuffles, which work by writing messages to disk and notifying other tasks to read them from disk. Note that edges never need to access other edges directly, and all communication is done through the vertices. Ankur http://www.ankurdave.com/
Re: the default GraphX graph-partition strategy on multicore machine?
Sorry, I didn't read your vertex replication example carefully, so my previous answer is wrong. Here's the correct one: On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Vertices can be replicated individually without requiring the entire vertex partition to be replicated. In this case, here's what will get replicated to each partition: EA: a (from VA), b (from VA), c (from VA) EB: a (from VA), d (from VB), e (from VB) EC: c (from VA), d (from VB) Ankur http://www.ankurdave.com/
Re: Graphx : Perfomance comparison over cluster
Thanks for your interest. I should point out that the numbers in the arXiv paper are from GraphX running on top of a custom version of Spark with an experimental in-memory shuffle prototype. As a result, if you benchmark GraphX at the current master, it's expected that it will be 2-3x slower than GraphLab. The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has changed a lot since then, and the way to configure and invoke Spark is different. I can send you the correct configuration/invocation for this if you're interested in benchmarking it. On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? You should use the built-in PageRank. If your graph is available in edge list format, you can run it using the Analytics driver as follows: ~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class org.apache.spark.graphx.lib.Analytics ~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS [--partStrategy=$PARTITION_STRATEGY] What should be the executor_memory, i.e. maximum or according to graph size? As much memory as possible while leaving room for the operating system. Is there any other configuration I should do to have the best performance? I think the parameters to Analytics above should be sufficient: - numEPart - should be equal to or a small integer multiple of the number of cores. More partitions improve work balance but also increase memory usage and communication, so in some cases it can even be faster with fewer partitions than cores. - partStrategy - If your edges are already sorted, you can skip this option, because GraphX will leave them as-is by default and that may be close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both worth trying. CC'ing Joey and Dan, who may have other suggestions. Ankur http://www.ankurdave.com/ On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: Hi, I am trying to compare Graphx and other distributed graph processing systems (graphlab) on my cluster of 64 nodes, each node having 32 cores and connected with infinite band. I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over there. I had few questions regarding configuration and achieving best performance. * Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? - If I shouldn't use the inbuilt pagerank, can you share your pagerank application? * What should be the executor_memory, i.e. maximum or according to graph size? * Other than, number of cores, executor_memory and partition strategy, Is there any other configuration I should do to have the best performance? I am using following script, import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val startgraphloading = System.currentTimeMillis; val graph = GraphLoader.edgeListFile(sc, filepath,true,32) val endgraphloading = System.currentTimeMillis; Thanks in advance :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: GraphX Pragel implementation
If your sendMsg function needs to know the incoming messages as well as the vertex value, you could define VD to be a tuple of the vertex value and the last received message. The vprog function would then store the incoming messages into the tuple, allowing sendMsg to access them. For example, if the vertex value was a String and the message type was an Int, you could call Pregel as follows: val graph: Graph[String, _] = ... graph.mapVertices((id, attr) = (attr, 0)).pregel(0)( (id, attr: (String, Int), msg: Int) = (attr._1, msg), edge = Iterator(...), // can use edge.srcAttr._2 and edge.dstAttr._2 to access the messages (a: Int, b: Int) = a + b) Ankur http://www.ankurdave.com/
Re: the default GraphX graph-partition strategy on multicore machine?
On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: Btw, is there any possibility to customise the partition strategy as we expect? I'm not sure I understand. Are you asking about defining a custom http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-how-to-specify-partition-strategy-td9309.html#a9338 partition strategy? On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote: when I load the file using sc.textFile (minPartitions = *16*, PartitionStrategy.RandomVertexCut) The easiest way to load the edge file would actually be to use GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut) . 1) how much data will be loaded into memory? The exact size of the graph (vertices + edges) in memory depends on the graph's structure, the partition function, and the average vertex degree, because each vertex must be replicated to all partitions where it is referenced. It's easier to estimate the size of just the edges, which I did on the mailing list http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-partition-problem-tp6248p6363.html a while ago. To summarize, during graph construction each edge takes 60 bytes, and once the graph is constructed it takes 20 bytes. 2) how many partitions will be stored in memory? Once you call cache() on the graph, all 16 partitions will be stored in memory. You can also tell GraphX to spill them to disk in case of memory pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to GraphLoader.edgeListFile. 3) If the thread/task on each core will read only *one* edge from memory and then compute it at every time? Yes, each task is single-threaded, so it will read the edges in its partition sequentially. 3.1) which edge on memory was determined to read into cache? In theory, each core should independently read the edges in its partition into its own cache. Cache issues should be much less of a concern than in most applications because different tasks (cores) operate on independent partitions; there is no shared-memory parallelism. The tradeoff is heavier reliance on shuffles. 3.2) how are those partitions being scheduled? Spark handles the scheduling. There are details in Section 5.1 of the Spark NSDI paper https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf; in short, tasks are scheduled to run on the same machine as their data partition, and by default each machine can accept at most one task per core. Ankur http://www.ankurdave.com/
Re: Graphx : optimal partitions for a graph and error in logs
On Fri, Jul 11, 2014 at 2:23 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: -- Is it a correct way to load file to get best performance? Yes, edgeListFile should be efficient at loading the edges. -- What should be the partition size? =computing node or =cores? In general it should be a multiple of the number of cores to exploit all available parallelism, but because of shuffle overhead, it might help to use fewer partitions -- in some cases even fewer than the number of cores. You can measure the performance with different numbers of partitions to see what is best. -- I see following error so many times in my logs [...] NotSerializableException This is a known bug, and there are two possible resolutions: 1. Switch from Java serialization to Kryo serialization, which is faster and will also resolve the problem, by setting the following Spark properties in conf/spark-defaults.conf: spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrator org.apache.spark.graphx.GraphKryoRegistrator 2. Mark the affected classes as Serializable. I'll submit a patch with this fix as well, but for now I'd suggest trying Kryo if possible. Ankur http://www.ankurdave.com/
Re: Graphx : optimal partitions for a graph and error in logs
I don't think it should affect performance very much, because GraphX doesn't serialize ShippableVertexPartition in the fast path of mapReduceTriplets execution (instead it calls ShippableVertexPartition.shipVertexAttributes and serializes the result). I think it should only get serialized for speculative execution, if you have that enabled. By the way, here's the fix: https://github.com/apache/spark/pull/1376 Ankur http://www.ankurdave.com/
Re: Graphx : optimal partitions for a graph and error in logs
Spark just uses opens up inter-slave TCP connections for message passing during shuffles (I think the relevant code is in ConnectionManager). Since TCP automatically determines http://en.wikipedia.org/wiki/TCP_congestion-avoidance_algorithm the optimal sending rate, Spark doesn't need any configuration parameters for this. Ankur http://www.ankurdave.com/
Re: GraphX: how to specify partition strategy?
On Thu, Jul 10, 2014 at 8:20 AM, Yifan LI iamyifa...@gmail.com wrote: - how to build the latest version of Spark from the master branch, which contains a fix? Instead of downloading a prebuilt Spark release from http://spark.apache.org/downloads.html, follow the instructions under Development Version on that page. In short: git clone git://github.com/apache/spark.git cd spark sbt/sbt assembly Then you can run bin/spark-shell and bin/spark-submit as usual, and Graph.partitionBy should work. - how to specify other partition strategy, eg. CanonicalRandomVertexCut, EdgePartition1D, EdgePartition2D, RandomVertexCut (listed in Scala API document, but seems only EdgePartition2D is available? I am not sure for this! ) All of those partition strategies should be available -- for example, you can call graph.partitionBy(PartitionStrategy.RandomVertexCut). - Is it possible to add my own partition strategy(hash function, etc.) into GraphX? Yes, you just need to create a subclass of PartitionStrategy as follows: import org.apache.spark.graphx._ object MyPartitionStrategy extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { // put your hash function here } } Ankur http://www.ankurdave.com/
Re: tiers of caching
I think tiers/priorities for caching are a very good idea and I'd be interested to see what others think. In addition to letting libraries cache RDDs liberally, it could also unify memory management across other parts of Spark. For example, small shuffles benefit from explicitly keeping the shuffle outputs in memory rather than writing it to disk, possibly due to filesystem overhead. To prevent in-memory shuffle outputs from competing with application RDDs, Spark could mark them as lower-priority and specify that they should be dropped to disk when memory runs low. Ankur http://www.ankurdave.com/
Re: graphx Joining two VertexPartitions with different indexes is slow.
Well, the alternative is to do a deep equality check on the index arrays, which would be somewhat expensive since these are pretty large arrays (one element per vertex in the graph). But, in case the reference equality check fails, it actually might be a good idea to do the deep check before resorting to the slow code path. Ankur http://www.ankurdave.com/
Re: Graphx traversal and merge interesting edges
Interesting problem! My understanding is that you want to (1) find paths matching a particular pattern, and (2) add edges between the start and end vertices of the matched paths. For (1), I implemented a pattern matcher for GraphX https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala that iteratively accumulates partial pattern matches. I used your example in the unit test https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala . For (2), you can take the output of the pattern matcher (the set of matching paths organized by their terminal vertices) and construct a set of new edges using the initial and terminal vertices of each path. Then you can make a new graph consisting of the union of the original edge set and the new edges. Let me know if you'd like help with this. Ankur http://www.ankurdave.com/
Re: graphx Joining two VertexPartitions with different indexes is slow.
When joining two VertexRDDs with identical indexes, GraphX can use a fast code path (a zip join without any hash lookups). However, the check for identical indexes is performed using reference equality. Without caching, two copies of the index are created. Although the two indexes are structurally identical, they fail reference equality, and so GraphX mistakenly uses the slow path involving a hash lookup per joined element. I'm working on a patch https://github.com/apache/spark/pull/1297 that attempts an optimistic zip join with per-element fallback to hash lookups, which would improve this situation. Ankur http://www.ankurdave.com/
Re: graphx Joining two VertexPartitions with different indexes is slow.
A common reason for the Joining ... is slow message is that you're joining VertexRDDs without having cached them first. This will cause Spark to recompute unnecessarily, and as a side effect, the same index will get created twice and GraphX won't be able to do an efficient zip join. For example, the following code will counterintuitively produce the Joining ... is slow message: val a = VertexRDD(sc.parallelize((1 to 100).map(x = (x.toLong, x a.leftJoin(a) { (id, a, b) = a + b } The remedy is to call a.cache() before a.leftJoin(a). Ankur http://www.ankurdave.com/
Re: Graphx SubGraph
Yes, the subgraph operator takes a vertex predicate and keeps only the edges where both vertices satisfy the predicate, so it will work as long as you can express the sublist in terms of a vertex predicate. If that's not possible, you can still obtain the same effect, but you'll have to use lower-level operations similar to how subgraph is itself implemented. I can help out if that's the case. Ankur http://www.ankurdave.com/
Re: Query on Merge Message (Graph: pregel operator)
Many merge operations can be broken up to work incrementally. For example, if the merge operation is to sum *n* rank updates, then you can set mergeMsg = (a, b) = a + b and this function will be applied to all *n* updates in arbitrary order to yield a final sum. Addition, multiplication, min, max, and mean are operations that work in this manner (they are associative and commutative). If you absolutely must operate on all *n* messages at once, for example to find the median, then a workaround is to emit Array(m) instead of m in the sendMsg function, and then to set mergeMsg = (a, b) = a ++ b. This will accumulate all inbound messages into an array which you can access in vprog. However, it will be much slower for graphs with high-degree vertices, because the accumulated arrays can grow very large. Ankur http://www.ankurdave.com/
Re: BSP realization on Spark
Master/worker assignment and barrier synchronization are handled by Spark, so Pregel will automatically coordinate the computation, communication, and synchronization needed to run for the specified number of supersteps (or until there are no messages sent in a particular superstep). Ankur http://www.ankurdave.com/
Re: Bayes Net with Graphx?
Vertices can have arbitrary properties associated with them: http://spark.apache.org/docs/latest/graphx-programming-guide.html#the-property-graph Ankur http://www.ankurdave.com/
Re: GraphX partition problem
I've been trying to reproduce this but I haven't succeeded so far. For example, on the web-Google https://snap.stanford.edu/data/web-Google.htmlgraph, I get the expected results both on v0.9.1-handle-empty-partitions and on master: // Load web-Google and run connected componentsimport org.apache.spark.graphx._val g = GraphLoader.edgeListFile(sc, /Users/ankurdave/Downloads/web-Google.txt, minEdgePartitions=8) g.vertices.count // = 875713val cc = g.connectedComponents.vertices.map(_._2).cache() cc.count // = 875713val counts = cc.countByValue counts.values.sum // = 875713// There should not be any single-vertex components, because we loaded an edge listcounts.count(_._2 == 0) // = 0counts.count(_._2 == 1) // = 0counts.count(_._2 == 2) // = 783counts.count(_._2 == 3) // = 503// The 3 smallest and largest components in the graph (with nondeterministic tiebreaking)counts.toArray.sortBy(_._2).take(3) // = Array((418467,2), (272504,2), (719750,2))counts.toArray.sortBy(_._2).takeRight(3) // = Array((1363,384), (1734,404), (0,855802)) Ankur http://www.ankurdave.com/
Re: Persist and unpersist
I think what's desired here is for input to be unpersisted automatically as soon as result is materialized. I don't think there's currently a way to do this, but the usual workaround is to force result to be materialized immediately and then unpersist input: input.cache()val count = input.countval result = input.filter(...) result.cache().foreach(x = {}) // materialize resultinput.unpersist() // safe because `result` is materialized // and is the only RDD that depends on `input`return result Ankur http://www.ankurdave.com/
Re: counting degrees graphx
Oh, looks like the Scala Map isn't serializable. I switched the code to use java.util.HashMap, which should work. Ankur http://www.ankurdave.com/ On Mon, May 26, 2014 at 3:21 PM, daze5112 david.zeel...@ato.gov.au wrote: Excellent thanks Ankur, looks like what im looking for Only one problem the line val dists = initDists.pregel[DistanceMap](Map())(vprog, sendMsg, mergeMsg) produces an error Job aborted: Task 268.0:5 had a not serializable result: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2 any ideas? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/counting-degrees-graphx-tp6370p6405.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: counting degrees graphx
On closer inspection it looks like Map normally is serializable, and it's just a bug in mapValues, so I changed to using the .map(identity) workaround described in https://issues.scala-lang.org/browse/SI-7005. Ankur http://www.ankurdave.com/
Re: GraphX partition problem
Once the graph is built, edges are stored in parallel primitive arrays, so each edge should only take 20 bytes to store (srcId: Long, dstId: Long, attr: Int). Unfortunately, the current implementation in EdgePartitionBuilder uses an array of Edge objects as an intermediate representation for sorting, so each edge will additionally take something like 40 bytes during graph construction (srcId (8) + dstId (8) + attr (4) + uncompressed pointer (8) + object overhead (8) + padding (4)). So you'll need about 1.2 TB of memory in total (60 bytes * 20 billion). Does your cluster have this much memory? If not, I've been meaning to write a custom sort routine that can operate directly on the three parallel arrays. This will reduce the memory requirements to about 400 GB. Ankur http://www.ankurdave.com/
Re: counting degrees graphx
I'm not sure I understand what you're looking for. Could you provide some more examples to clarify? One interpretation is that you want to tag the source vertices in a graph (those with zero indegree) and find for each vertex the set of sources that lead to that vertex. For vertices 1-8 in the graph, this would give you [{1}, {1}, {1}, {1}, {1, 6}, {7}, {7}]. Ankur http://www.ankurdave.com/
Re: GraphX vertices and connected edges
Do you mean you want to obtain a list of adjacent edges for every vertex? A mapReduceTriplets followed by a join is the right way to do this. The join will be cheap because the original and derived vertices will share indices. There's a built-in function to do this for neighboring vertex properties called GraphOps.collectNeighborshttp://spark.apache.org/docs/latest/api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]. In the latest version of GraphX there's also GraphOps.collectEdgeshttps://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala#L140, but that doesn't do the join. As the docs for these functions state, this will be memory-intensive for high-degree vertices, so it would be better to break up the computation so you don't need to collect all neighbors or edges if possible. Ankur http://www.ankurdave.com/ On Fri, May 2, 2014 at 11:34 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: What is the most efficient way to an RDD of GraphX vertices and their connected edges?