Repository: spark
Updated Branches:
  refs/heads/master 894ecde04 -> b1feb6020


[SPARK-1991] Support custom storage levels for vertices and edges

This PR adds support for specifying custom storage levels for the vertices and 
edges of a graph. This enables GraphX to handle graphs larger than memory size 
by specifying MEMORY_AND_DISK and then repartitioning the graph to use many 
small partitions, each of which does fit in memory. Spark will then 
automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the 
graph by passing them to the graph constructor. These are then stored in the 
`targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. 
Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use 
it more than once, for example), it uses the specified target storage level. 
Also, when the user calls `Graph#cache()`, the vertices and edges are persisted 
using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD 
and EdgeRDD operations, we remove raw calls to the constructors and instead 
introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely 
memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB 
graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. 
With this PR, and using the DISK_ONLY storage level, they succeed.

Author: Ankur Dave <ankurd...@gmail.com>

Closes #946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using 
underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility 
checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices 
and edges


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1feb602
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1feb602
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1feb602

Branch: refs/heads/master
Commit: b1feb60209174433262de2a26d39616ba00edcc8
Parents: 894ecde
Author: Ankur Dave <ankurd...@gmail.com>
Authored: Tue Jun 3 14:54:26 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Tue Jun 3 14:54:26 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/StorageLevel.scala | 21 +++++
 .../scala/org/apache/spark/graphx/EdgeRDD.scala | 67 +++++++++++++++-
 .../scala/org/apache/spark/graphx/Graph.scala   | 34 +++++---
 .../org/apache/spark/graphx/GraphLoader.scala   | 12 ++-
 .../org/apache/spark/graphx/VertexRDD.scala     | 49 +++++++++---
 .../apache/spark/graphx/impl/GraphImpl.scala    | 55 ++++++-------
 .../graphx/impl/ReplicatedVertexView.scala      |  6 +-
 .../org/apache/spark/graphx/lib/Analytics.scala | 82 ++++++++++----------
 8 files changed, 229 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 363de93..2d8ff11 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -149,6 +149,27 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
+   * Return the StorageLevel object with the specified name.
+   */
+  @DeveloperApi
+  def fromString(s: String): StorageLevel = s match {
+    case "NONE" => NONE
+    case "DISK_ONLY" => DISK_ONLY
+    case "DISK_ONLY_2" => DISK_ONLY_2
+    case "MEMORY_ONLY" => MEMORY_ONLY
+    case "MEMORY_ONLY_2" => MEMORY_ONLY_2
+    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
+    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
+    case "MEMORY_AND_DISK" => MEMORY_AND_DISK
+    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
+    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
+    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
+    case "OFF_HEAP" => OFF_HEAP
+    case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
+  }
+
+  /**
+   * :: DeveloperApi ::
    * Create a new StorageLevel object without setting useOffHeap.
    */
   @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index a8fc095..899a3cb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
 
 /**
  * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar 
format on each
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
  * `impl.ReplicatedVertexView`.
  */
 class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends RDD[Edge[ED]](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
 
   partitionsRDD.setName("EdgeRDD")
@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 
+  /**
+   * Persists the edge partitions at the specified storage level, ignoring any 
existing target
+   * storage level.
+   */
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
     this
   }
 
+  /** Persists the vertex partitions using `targetStorageLevel`, which 
defaults to MEMORY_ONLY. */
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
   private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
       f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): 
EdgeRDD[ED2, VD2] = {
-    new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+    this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
       if (iter.hasNext) {
         val (pid, ep) = iter.next()
         Iterator(Tuple2(pid, f(pid, ep)))
@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
       (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
-    new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, 
true) {
+    this.withPartitionsRDD[ED3, 
VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
       (thisIter, otherIter) =>
         val (pid, thisEPart) = thisIter.next()
         val (_, otherEPart) = otherIter.next()
         Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, 
ed3Tag)))
     })
   }
+
+  /** Replaces the vertex partitions while preserving all other properties of 
the VertexRDD. */
+  private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): 
EdgeRDD[ED2, VD2] = {
+    new EdgeRDD(partitionsRDD, this.targetStorageLevel)
+  }
+
+  /**
+   * Changes the target storage level while preserving all other properties of 
the
+   * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage 
level.
+   *
+   * This does not actually trigger a cache; to do this, call
+   * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
+   */
+  private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+    new EdgeRDD(this.partitionsRDD, targetStorageLevel)
+  }
+
+}
+
+object EdgeRDD {
+  /**
+   * Creates an EdgeRDD from a set of edges.
+   *
+   * @tparam ED the edge attribute type
+   * @tparam VD the type of the vertex attributes that may be joined with the 
returned EdgeRDD
+   */
+  def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, 
VD] = {
+    val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
+      val builder = new EdgePartitionBuilder[ED, VD]
+      iter.foreach { e =>
+        builder.add(e.srcId, e.dstId, e.attr)
+      }
+      Iterator((pid, builder.toEdgePartition))
+    }
+    EdgeRDD.fromEdgePartitions(edgePartitions)
+  }
+
+  /**
+   * Creates an EdgeRDD from already-constructed edge partitions.
+   *
+   * @tparam ED the edge attribute type
+   * @tparam VD the type of the vertex attributes that may be joined with the 
returned EdgeRDD
+   */
+  def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+      edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+    new EdgeRDD(edgePartitions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index dc5dac4..c4f9d65 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () 
extends Serializab
   @transient val triplets: RDD[EdgeTriplet[VD, ED]]
 
   /**
-   * Caches the vertices and edges associated with this graph at the specified 
storage level.
+   * Caches the vertices and edges associated with this graph at the specified 
storage level,
+   * ignoring any target storage levels previously set.
    *
    * @param newLevel the level at which to cache the graph.
    *
@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () 
extends Serializab
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
 
   /**
-   * Caches the vertices and edges associated with this graph. This is used to
-   * pin a graph in memory enabling multiple queries to reuse the same
-   * construction process.
+   * Caches the vertices and edges associated with this graph at the 
previously-specified target
+   * storage levels, which default to `MEMORY_ONLY`. This is used to pin a 
graph in memory enabling
+   * multiple queries to reuse the same construction process.
    */
   def cache(): Graph[VD, ED]
 
@@ -358,9 +359,12 @@ object Graph {
    * Construct a graph from a collection of edges encoded as vertex id pairs.
    *
    * @param rawEdges a collection of edges in (src, dst) form
+   * @param defaultValue the vertex attributes with which to create vertices 
referenced by the edges
    * @param uniqueEdges if multiple identical edges are found they are 
combined and the edge
    * attribute is set to the sum.  Otherwise duplicate edges are treated as 
separate. To enable
    * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
+   * @param edgeStorageLevel the desired storage level at which to cache the 
edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the 
vertices if necessary
    *
    * @return a graph with edge attributes containing either the count of 
duplicate edges or 1
    * (if `uniqueEdges` is `None`) and vertex attributes containing the total 
degree of each vertex.
@@ -368,10 +372,12 @@ object Graph {
   def fromEdgeTuples[VD: ClassTag](
       rawEdges: RDD[(VertexId, VertexId)],
       defaultValue: VD,
-      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+      uniqueEdges: Option[PartitionStrategy] = None,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, 
Int] =
   {
     val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
-    val graph = GraphImpl(edges, defaultValue)
+    val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, 
vertexStorageLevel)
     uniqueEdges match {
       case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
       case None => graph
@@ -383,14 +389,18 @@ object Graph {
    *
    * @param edges the RDD containing the set of edges in the graph
    * @param defaultValue the default vertex attribute to use for each vertex
+   * @param edgeStorageLevel the desired storage level at which to cache the 
edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the 
vertices if necessary
    *
    * @return a graph with edge attributes described by `edges` and vertices
    *         given by all vertices in `edges` with value `defaultValue`
    */
   def fromEdges[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultValue: VD): Graph[VD, ED] = {
-    GraphImpl(edges, defaultValue)
+      defaultValue: VD,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, 
ED] = {
+    GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
   }
 
   /**
@@ -405,12 +415,16 @@ object Graph {
    * @param edges the collection of edges in the graph
    * @param defaultVertexAttr the default vertex attribute to use for vertices 
that are
    *                          mentioned in edges but not in vertices
+   * @param edgeStorageLevel the desired storage level at which to cache the 
edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the 
vertices if necessary
    */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
-    GraphImpl(vertices, edges, defaultVertexAttr)
+      defaultVertexAttr: VD = null.asInstanceOf[VD],
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, 
ED] = {
+    GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, 
vertexStorageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 389490c..2e814e3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.graphx
 
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 
@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
    * @param minEdgePartitions the number of partitions for the edge RDD
+   * @param edgeStorageLevel the desired storage level for the edge 
partitions. To set the vertex
+   *        storage level, call 
[[org.apache.spark.graphx.Graph#persistVertices]].
    */
   def edgeListFile(
       sc: SparkContext,
       path: String,
       canonicalOrientation: Boolean = false,
-      minEdgePartitions: Int = 1)
+      minEdgePartitions: Int = 1,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
     : Graph[Int, Int] =
   {
     val startTime = System.currentTimeMillis
@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
         }
       }
       Iterator((pid, builder.toEdgePartition))
-    }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
+    }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges 
(%s)".format(path))
     edges.count()
 
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis 
- startTime))
 
-    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
+    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, 
edgeStorageLevel = edgeStorageLevel,
+      vertexStorageLevel = vertexStorageLevel)
   } // end of edgeListFile
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8b910fb..f1b6df9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[ShippableVertexPartition[VD]])
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends RDD[(VertexId, VD)](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
 
   require(partitionsRDD.partitioner.isDefined)
@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
    * VertexRDD will be based on a different index and can no longer be quickly 
joined with this
    * RDD.
    */
-  def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+  def reindex(): VertexRDD[VD] = 
this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
 
   override val partitioner = partitionsRDD.partitioner
 
@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
   }
   setName("VertexRDD")
 
+  /**
+   * Persists the vertex partitions at the specified storage level, ignoring 
any existing target
+   * storage level.
+   */
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
     this
   }
 
+  /** Persists the vertex partitions at `targetStorageLevel`, which defaults 
to MEMORY_ONLY. */
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
   /** The number of vertices in the RDD. */
   override def count(): Long = {
     partitionsRDD.map(_.size).reduce(_ + _)
@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
       f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), 
preservesPartitioning = true)
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
 
@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.diff(otherPart))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.leftJoin(otherPart)(f))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
       case other: VertexRDD[_] =>
         leftZipJoin(other)(f)
       case _ =>
-        new VertexRDD[VD3](
+        this.withPartitionsRDD[VD3](
           partitionsRDD.zipPartitions(
             other.copartitionWithVertices(this.partitioner.get), 
preservesPartitioning = true) {
             (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.innerJoin(otherPart)(f))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
       case other: VertexRDD[_] =>
         innerZipJoin(other)(f)
       case _ =>
-        new VertexRDD(
+        this.withPartitionsRDD(
           partitionsRDD.zipPartitions(
             other.copartitionWithVertices(this.partitioner.get), 
preservesPartitioning = true) {
             (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, 
msgIter) =>
       thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
     }
-    new VertexRDD[VD2](parts)
+    this.withPartitionsRDD[VD2](parts)
   }
 
   /**
@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
           if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
         partIter.map(_.withRoutingTable(routingTable))
     }
-    new VertexRDD(vertexPartitions)
+    this.withPartitionsRDD(vertexPartitions)
+  }
+
+  /** Replaces the vertex partitions while preserving all other properties of 
the VertexRDD. */
+  private[graphx] def withPartitionsRDD[VD2: ClassTag](
+      partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+    new VertexRDD(partitionsRDD, this.targetStorageLevel)
+  }
+
+  /**
+   * Changes the target storage level while preserving all other properties of 
the
+   * VertexRDD. Operations on the returned VertexRDD will preserve this 
storage level.
+   *
+   * This does not actually trigger a cache; to do this, call
+   * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
+   */
+  private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+    new VertexRDD(this.partitionsRDD, targetStorageLevel)
   }
 
   /** Generates an RDD of vertex attributes suitable for shipping to the edge 
partitions. */

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 1649b24..59d9a88 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     this
   }
 
-  override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+  override def cache(): Graph[VD, ED] = {
+    vertices.cache()
+    replicatedVertexView.edges.cache()
+    this
+  }
 
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, 
ED] = {
-    val numPartitions = replicatedVertexView.edges.partitions.size
+    val numPartitions = edges.partitions.size
     val edTag = classTag[ED]
     val vdTag = classTag[VD]
-    val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
+    val newEdges = edges.withPartitionsRDD(edges.map { e =>
       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, 
numPartitions)
 
       // Should we be using 3-tuple or an optimized class
@@ -256,24 +260,33 @@ object GraphImpl {
   /** Create a graph from edges, setting referenced vertices to 
`defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, 
vertexStorageLevel)
   }
 
   /** Create a graph from EdgePartitions, setting referenced vertices to 
`defaultVertexAttr`. */
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
       edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, 
edgeStorageLevel,
+      vertexStorageLevel)
   }
 
   /** Create a graph from vertices and edges, setting missing vertices to 
`defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
+      .withTargetStorageLevel(edgeStorageLevel).cache()
     val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
+      .withTargetStorageLevel(vertexStorageLevel).cache()
     GraphImpl(vertexRDD, edgeRDD)
   }
 
@@ -309,23 +322,13 @@ object GraphImpl {
    */
   private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
       edges: EdgeRDD[ED, VD],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    edges.cache()
-    val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, 
defaultVertexAttr)
-    fromExistingRDDs(vertices, edges)
-  }
-
-  /** Create an EdgeRDD from a set of edges. */
-  private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
-      edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
-    val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED, VD]
-      iter.foreach { e =>
-        builder.add(e.srcId, e.dstId, e.attr)
-      }
-      Iterator((pid, builder.toEdgePartition))
-    }
-    new EdgeRDD(edgePartitions)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
+    val vertices = VertexRDD.fromEdges(edgesCached, 
edgesCached.partitions.size, defaultVertexAttr)
+      .withTargetStorageLevel(vertexStorageLevel)
+    fromExistingRDDs(vertices, edgesCached)
   }
 
 } // end of object GraphImpl

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 3a0bba1..86b366e 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
           .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s 
(broadcast)".format(
             includeSrc, includeDst, shipSrc, shipDst))
           .partitionBy(edges.partitioner.get)
-      val newEdges = new 
EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+      val newEdges = 
edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
         (ePartIter, shippedVertsIter) => ePartIter.map {
           case (pid, edgePartition) =>
             (pid, 
edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
       .setName("ReplicatedVertexView.withActiveSet - shippedActives 
(broadcast)")
       .partitionBy(edges.partitioner.get)
 
-    val newEdges = new 
EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+    val newEdges = 
edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
       (ePartIter, shippedActivesIter) => ePartIter.map {
         case (pid, edgePartition) =>
           (pid, 
edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
         hasSrcId, hasDstId))
       .partitionBy(edges.partitioner.get)
 
-    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) 
{
+    val newEdges = 
edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
       (ePartIter, shippedVertsIter) => ePartIter.map {
         case (pid, edgePartition) =>
           (pid, 
edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 069e042..c1513a0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.graphx.lib
 
+import scala.collection.mutable
 import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.PartitionStrategy._
 
@@ -28,18 +30,20 @@ object Analytics extends Logging {
 
   def main(args: Array[String]): Unit = {
     if (args.length < 2) {
-      System.err.println("Usage: Analytics <taskType> <file> [other options]")
+      System.err.println(
+        "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> 
[other options]")
       System.exit(1)
     }
 
     val taskType = args(0)
     val fname = args(1)
-    val options =  args.drop(2).map { arg =>
+    val optionsList = args.drop(2).map { arg =>
       arg.dropWhile(_ == '-').split('=') match {
         case Array(opt, v) => (opt -> v)
         case _ => throw new IllegalArgumentException("Invalid argument: " + 
arg)
       }
     }
+    val options = mutable.Map(optionsList: _*)
 
     def pickPartitioner(v: String): PartitionStrategy = {
       // TODO: Use reflection rather than listing all the partitioning 
strategies here.
@@ -57,20 +61,24 @@ object Analytics extends Logging {
       .set("spark.kryo.registrator", 
"org.apache.spark.graphx.GraphKryoRegistrator")
       .set("spark.locality.wait", "100000")
 
+    val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+      println("Set the number of edge partitions using --numEPart.")
+      sys.exit(1)
+    }
+    val partitionStrategy: Option[PartitionStrategy] = 
options.remove("partStrategy")
+      .map(pickPartitioner(_))
+    val edgeStorageLevel = options.remove("edgeStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+    val vertexStorageLevel = options.remove("vertexStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
     taskType match {
       case "pagerank" =>
-        var tol: Float = 0.001F
-        var outFname = ""
-        var numEPart = 4
-        var partitionStrategy: Option[PartitionStrategy] = None
-        var numIterOpt: Option[Int] = None
-
-        options.foreach{
-          case ("tol", v) => tol = v.toFloat
-          case ("output", v) => outFname = v
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = 
Some(pickPartitioner(v))
-          case ("numIter", v) => numIterOpt = Some(v.toInt)
+        val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+        val outFname = options.remove("output").getOrElse("")
+        val numIterOpt = options.remove("numIter").map(_.toInt)
+
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
         }
 
@@ -81,7 +89,9 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
 
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart).cache()
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
         val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
 
         println("GRAPHX: Number of vertices " + graph.vertices.count)
@@ -102,32 +112,19 @@ object Analytics extends Logging {
         sc.stop()
 
       case "cc" =>
-        var numIter = Int.MaxValue
-        var numVPart = 4
-        var numEPart = 4
-        var isDynamic = false
-        var partitionStrategy: Option[PartitionStrategy] = None
-
-        options.foreach{
-          case ("numIter", v) => numIter = v.toInt
-          case ("dynamic", v) => isDynamic = v.toBoolean
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("numVPart", v) => numVPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = 
Some(pickPartitioner(v))
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
         }
 
-        if (!isDynamic && numIter == Int.MaxValue) {
-          println("Set number of iterations!")
-          sys.exit(1)
-        }
         println("======================================")
         println("|      Connected Components          |")
         println("======================================")
 
         val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + 
fname + ")"))
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart).cache()
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
         val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
 
         val cc = ConnectedComponents.run(graph)
@@ -135,24 +132,25 @@ object Analytics extends Logging {
         sc.stop()
 
       case "triangles" =>
-        var numEPart = 4
-        // TriangleCount requires the graph to be partitioned
-        var partitionStrategy: PartitionStrategy = RandomVertexCut
-
-        options.foreach{
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
         }
+
         println("======================================")
         println("|      Triangle Count                |")
         println("======================================")
+
         val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + 
")"))
-        val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = 
true,
-          minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+        val graph = GraphLoader.edgeListFile(sc, fname,
+          canonicalOrientation = true,
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel)
+        // TriangleCount requires the graph to be partitioned
+          .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
         val triangles = TriangleCount.run(graph)
         println("Triangles: " + triangles.vertices.map {
-          case (vid,data) => data.toLong
+          case (vid, data) => data.toLong
         }.reduce(_ + _) / 3)
         sc.stop()
 

Reply via email to