Repository: spark
Updated Branches:
  refs/heads/master c3afd3266 -> a5ef58113


[SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD

This discourages users from calling the VertexRDD and EdgeRDD constructor and 
makes it easier for future changes to ensure backward compatibility.

Author: Ankur Dave <ankurd...@gmail.com>

Closes #2530 from ankurdave/SPARK-3666 and squashes the following commits:

d681f45 [Ankur Dave] Define getPartitions and compute in abstract class for MIMA
1472390 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into 
SPARK-3666
24201d4 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into 
SPARK-3666
cbe15f2 [Ankur Dave] Remove specialized annotation from VertexRDD and EdgeRDD
931b587 [Ankur Dave] Use abstract class instead of trait for binary 
compatibility
9ba4ec4 [Ankur Dave] Mark (Vertex|Edge)RDDImpl constructors package-private
620e603 [Ankur Dave] Extract VertexRDD interface and move implementation to 
VertexRDDImpl
55b6398 [Ankur Dave] Extract EdgeRDD interface and move implementation to 
EdgeRDDImpl


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5ef5811
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5ef5811
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5ef5811

Branch: refs/heads/master
Commit: a5ef58113667ff73562ce6db381cff96a0b354b0
Parents: c3afd32
Author: Ankur Dave <ankurd...@gmail.com>
Authored: Wed Nov 12 13:49:20 2014 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 12 13:49:20 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/EdgeRDD.scala | 111 ++--------
 .../org/apache/spark/graphx/VertexRDD.scala     | 190 ++++-------------
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  | 124 +++++++++++
 .../spark/graphx/impl/VertexRDDImpl.scala       | 205 +++++++++++++++++++
 4 files changed, 386 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5ef5811/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 5267560..869ef15 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -17,14 +17,18 @@
 
 package org.apache.spark.graphx
 
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark.Dependency
+import org.apache.spark.Partition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.spark.graphx.impl.EdgePartition
 import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.graphx.impl.EdgeRDDImpl
 
 /**
  * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar 
format on each
@@ -32,30 +36,13 @@ import org.apache.spark.graphx.impl.EdgePartitionBuilder
  * edge to provide the triplet view. Shipping of the vertex attributes is 
managed by
  * `impl.ReplicatedVertexView`.
  */
-class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
-    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
-  extends RDD[Edge[ED]](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
-
-  override def setName(_name: String): this.type = {
-    if (partitionsRDD.name != null) {
-      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
-    } else {
-      partitionsRDD.setName(_name)
-    }
-    this
-  }
-  setName("EdgeRDD")
+abstract class EdgeRDD[ED, VD](
+    @transient sc: SparkContext,
+    @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
 
-  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
+  private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]
 
-  /**
-   * If `partitionsRDD` already has a partitioner, use it. Otherwise assume 
that the
-   * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions 
and create a new
-   * partitioner that allows co-partitioning with `partitionsRDD`.
-   */
-  override val partitioner =
-    
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
     val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)
@@ -66,45 +53,6 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
     }
   }
 
-  override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
-
-  /**
-   * Persists the edge partitions at the specified storage level, ignoring any 
existing target
-   * storage level.
-   */
-  override def persist(newLevel: StorageLevel): this.type = {
-    partitionsRDD.persist(newLevel)
-    this
-  }
-
-  override def unpersist(blocking: Boolean = true): this.type = {
-    partitionsRDD.unpersist(blocking)
-    this
-  }
-
-  /** Persists the edge partitions using `targetStorageLevel`, which defaults 
to MEMORY_ONLY. */
-  override def cache(): this.type = {
-    partitionsRDD.persist(targetStorageLevel)
-    this
-  }
-
-  /** The number of edges in the RDD. */
-  override def count(): Long = {
-    partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
-  }
-
-  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
-      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): 
EdgeRDD[ED2, VD2] = {
-    this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
-      if (iter.hasNext) {
-        val (pid, ep) = iter.next()
-        Iterator(Tuple2(pid, f(pid, ep)))
-      } else {
-        Iterator.empty
-      }
-    }, preservesPartitioning = true))
-  }
-
   /**
    * Map the values in an edge partitioning preserving the structure but 
changing the values.
    *
@@ -112,22 +60,19 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
    * @param f the function from an edge to a new edge value
    * @return a new EdgeRDD containing the new edge values
    */
-  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
-    mapEdgePartitions((pid, part) => part.map(f))
+  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
 
   /**
    * Reverse all the edges in this RDD.
    *
    * @return a new EdgeRDD containing all the edges reversed
    */
-  def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+  def reverse: EdgeRDD[ED, VD]
 
   /** Removes all edges but those matching `epred` and where both vertices 
match `vpred`. */
   def filter(
       epred: EdgeTriplet[VD, ED] => Boolean,
-      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
-    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
-  }
+      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]
 
   /**
    * Inner joins this EdgeRDD with another EdgeRDD, assuming both are 
partitioned using the same
@@ -140,22 +85,14 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
       (other: EdgeRDD[ED2, _])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
-    val ed2Tag = classTag[ED2]
-    val ed3Tag = classTag[ED3]
-    this.withPartitionsRDD[ED3, 
VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
-      (thisIter, otherIter) =>
-        val (pid, thisEPart) = thisIter.next()
-        val (_, otherEPart) = otherIter.next()
-        Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, 
ed3Tag)))
-    })
-  }
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
+
+  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): 
EdgeRDD[ED2, VD2]
 
-  /** Replaces the vertex partitions while preserving all other properties of 
the VertexRDD. */
+  /** Replaces the edge partitions while preserving all other properties of 
the EdgeRDD. */
   private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
-      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): 
EdgeRDD[ED2, VD2] = {
-    new EdgeRDD(partitionsRDD, this.targetStorageLevel)
-  }
+      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): 
EdgeRDD[ED2, VD2]
 
   /**
    * Changes the target storage level while preserving all other properties of 
the
@@ -164,11 +101,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
    * This does not actually trigger a cache; to do this, call
    * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
    */
-  private[graphx] def withTargetStorageLevel(
-      targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
-    new EdgeRDD(this.partitionsRDD, targetStorageLevel)
-  }
-
+  private[graphx] def withTargetStorageLevel(targetStorageLevel: 
StorageLevel): EdgeRDD[ED, VD]
 }
 
 object EdgeRDD {
@@ -197,6 +130,6 @@ object EdgeRDD {
    */
   def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
       edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
-    new EdgeRDD(edgePartitions)
+    new EdgeRDDImpl(edgePartitions)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5ef5811/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 12216d9..f8be176 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx.impl.RoutingTablePartition
 import org.apache.spark.graphx.impl.ShippableVertexPartition
 import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.VertexRDDImpl
 
 /**
  * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for 
each vertex and by
@@ -53,62 +54,16 @@ import org.apache.spark.graphx.impl.VertexAttributeBlock
  *
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
-class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[ShippableVertexPartition[VD]],
-    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
-  extends RDD[(VertexId, VD)](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
+abstract class VertexRDD[VD](
+    @transient sc: SparkContext,
+    @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) 
{
 
-  require(partitionsRDD.partitioner.isDefined)
+  implicit protected def vdTag: ClassTag[VD]
 
-  /**
-   * Construct a new VertexRDD that is indexed by only the visible vertices. 
The resulting
-   * VertexRDD will be based on a different index and can no longer be quickly 
joined with this
-   * RDD.
-   */
-  def reindex(): VertexRDD[VD] = 
this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
-
-  override val partitioner = partitionsRDD.partitioner
+  private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]]
 
   override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
 
-  override protected def getPreferredLocations(s: Partition): Seq[String] =
-    partitionsRDD.preferredLocations(s)
-
-  override def setName(_name: String): this.type = {
-    if (partitionsRDD.name != null) {
-      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
-    } else {
-      partitionsRDD.setName(_name)
-    }
-    this
-  }
-  setName("VertexRDD")
-
-  /**
-   * Persists the vertex partitions at the specified storage level, ignoring 
any existing target
-   * storage level.
-   */
-  override def persist(newLevel: StorageLevel): this.type = {
-    partitionsRDD.persist(newLevel)
-    this
-  }
-
-  override def unpersist(blocking: Boolean = true): this.type = {
-    partitionsRDD.unpersist(blocking)
-    this
-  }
-
-  /** Persists the vertex partitions at `targetStorageLevel`, which defaults 
to MEMORY_ONLY. */
-  override def cache(): this.type = {
-    partitionsRDD.persist(targetStorageLevel)
-    this
-  }
-
-  /** The number of vertices in the RDD. */
-  override def count(): Long = {
-    partitionsRDD.map(_.size.toLong).reduce(_ + _)
-  }
-
   /**
    * Provides the `RDD[(VertexId, VD)]` equivalent output.
    */
@@ -117,21 +72,27 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
+   * Construct a new VertexRDD that is indexed by only the visible vertices. 
The resulting
+   * VertexRDD will be based on a different index and can no longer be quickly 
joined with this
+   * RDD.
+   */
+  def reindex(): VertexRDD[VD]
+
+  /**
    * Applies a function to each `VertexPartition` of this RDD and returns a 
new VertexRDD.
    */
   private[graphx] def mapVertexPartitions[VD2: ClassTag](
       f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
-    : VertexRDD[VD2] = {
-    val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), 
preservesPartitioning = true)
-    this.withPartitionsRDD(newPartitionsRDD)
-  }
-
+    : VertexRDD[VD2]
 
   /**
    * Restricts the vertex set to the set of vertices satisfying the given 
predicate. This operation
    * preserves the index for efficient joins with the original RDD, and it 
sets bits in the bitmask
    * rather than allocating new memory.
    *
+   * It is declared and defined here to allow refining the return type from 
`RDD[(VertexId, VD)]` to
+   * `VertexRDD[VD]`.
+   *
    * @param pred the user defined predicate, which takes a tuple to conform to 
the
    * `RDD[(VertexId, VD)]` interface
    */
@@ -147,8 +108,7 @@ class VertexRDD[@specialized VD: ClassTag](
    * @return a new VertexRDD with values obtained by applying `f` to each of 
the entries in the
    * original VertexRDD
    */
-  def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
-    this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+  def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
 
   /**
    * Maps each vertex attribute, additionally supplying the vertex ID.
@@ -159,23 +119,13 @@ class VertexRDD[@specialized VD: ClassTag](
    * @return a new VertexRDD with values obtained by applying `f` to each of 
the entries in the
    * original VertexRDD.  The resulting VertexRDD retains the same index.
    */
-  def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
-    this.mapVertexPartitions(_.map(f))
+  def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
 
   /**
    * Hides vertices that are the same between `this` and `other`; for vertices 
that are different,
    * keeps the values from `other`.
    */
-  def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
-    val newPartitionsRDD = partitionsRDD.zipPartitions(
-      other.partitionsRDD, preservesPartitioning = true
-    ) { (thisIter, otherIter) =>
-      val thisPart = thisIter.next()
-      val otherPart = otherIter.next()
-      Iterator(thisPart.diff(otherPart))
-    }
-    this.withPartitionsRDD(newPartitionsRDD)
-  }
+  def diff(other: VertexRDD[VD]): VertexRDD[VD]
 
   /**
    * Left joins this RDD with another VertexRDD with the same index. This 
function will fail if
@@ -192,16 +142,7 @@ class VertexRDD[@specialized VD: ClassTag](
    * @return a VertexRDD containing the results of `f`
    */
   def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
-      (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): 
VertexRDD[VD3] = {
-    val newPartitionsRDD = partitionsRDD.zipPartitions(
-      other.partitionsRDD, preservesPartitioning = true
-    ) { (thisIter, otherIter) =>
-      val thisPart = thisIter.next()
-      val otherPart = otherIter.next()
-      Iterator(thisPart.leftJoin(otherPart)(f))
-    }
-    this.withPartitionsRDD(newPartitionsRDD)
-  }
+      (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): 
VertexRDD[VD3]
 
   /**
    * Left joins this VertexRDD with an RDD containing vertex attribute pairs. 
If the other RDD is
@@ -222,37 +163,14 @@ class VertexRDD[@specialized VD: ClassTag](
   def leftJoin[VD2: ClassTag, VD3: ClassTag]
       (other: RDD[(VertexId, VD2)])
       (f: (VertexId, VD, Option[VD2]) => VD3)
-    : VertexRDD[VD3] = {
-    // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
-    // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
-    other match {
-      case other: VertexRDD[_] =>
-        leftZipJoin(other)(f)
-      case _ =>
-        this.withPartitionsRDD[VD3](
-          partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = 
true) {
-            (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
-          }
-        )
-    }
-  }
+    : VertexRDD[VD3]
 
   /**
    * Efficiently inner joins this VertexRDD with another VertexRDD sharing the 
same index. See
    * [[innerJoin]] for the behavior of the join.
    */
   def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
-      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
-    val newPartitionsRDD = partitionsRDD.zipPartitions(
-      other.partitionsRDD, preservesPartitioning = true
-    ) { (thisIter, otherIter) =>
-      val thisPart = thisIter.next()
-      val otherPart = otherIter.next()
-      Iterator(thisPart.innerJoin(otherPart)(f))
-    }
-    this.withPartitionsRDD(newPartitionsRDD)
-  }
+      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
 
   /**
    * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. 
If the other RDD is
@@ -266,21 +184,7 @@ class VertexRDD[@specialized VD: ClassTag](
    *         `this` and `other`, with values supplied by `f`
    */
   def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
-      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
-    // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
-    // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
-    other match {
-      case other: VertexRDD[_] =>
-        innerZipJoin(other)(f)
-      case _ =>
-        this.withPartitionsRDD(
-          partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = 
true) {
-            (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
-          }
-        )
-    }
-  }
+      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
 
   /**
    * Aggregates vertices in `messages` that have the same ids using 
`reduceFunc`, returning a
@@ -294,38 +198,20 @@ class VertexRDD[@specialized VD: ClassTag](
    * messages.
    */
   def aggregateUsingIndex[VD2: ClassTag](
-      messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): 
VertexRDD[VD2] = {
-    val shuffled = messages.partitionBy(this.partitioner.get)
-    val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, 
msgIter) =>
-      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
-    }
-    this.withPartitionsRDD[VD2](parts)
-  }
+      messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): 
VertexRDD[VD2]
 
   /**
    * Returns a new `VertexRDD` reflecting a reversal of all edge directions in 
the corresponding
    * [[EdgeRDD]].
    */
-  def reverseRoutingTables(): VertexRDD[VD] =
-    this.mapVertexPartitions(vPart => 
vPart.withRoutingTable(vPart.routingTable.reverse))
+  def reverseRoutingTables(): VertexRDD[VD]
 
   /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
-  def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
-    val routingTables = VertexRDD.createRoutingTables(edges, 
this.partitioner.get)
-    val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
-      (partIter, routingTableIter) =>
-        val routingTable =
-          if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
-        partIter.map(_.withRoutingTable(routingTable))
-    }
-    this.withPartitionsRDD(vertexPartitions)
-  }
+  def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD]
 
   /** Replaces the vertex partitions while preserving all other properties of 
the VertexRDD. */
   private[graphx] def withPartitionsRDD[VD2: ClassTag](
-      partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
-    new VertexRDD(partitionsRDD, this.targetStorageLevel)
-  }
+      partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2]
 
   /**
    * Changes the target storage level while preserving all other properties of 
the
@@ -335,20 +221,14 @@ class VertexRDD[@specialized VD: ClassTag](
    * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
    */
   private[graphx] def withTargetStorageLevel(
-      targetStorageLevel: StorageLevel): VertexRDD[VD] = {
-    new VertexRDD(this.partitionsRDD, targetStorageLevel)
-  }
+      targetStorageLevel: StorageLevel): VertexRDD[VD]
 
   /** Generates an RDD of vertex attributes suitable for shipping to the edge 
partitions. */
   private[graphx] def shipVertexAttributes(
-      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])] = {
-    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, 
shipDst)))
-  }
+      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])]
 
   /** Generates an RDD of vertex IDs suitable for shipping to the edge 
partitions. */
-  private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
-    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
-  }
+  private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])]
 
 } // end of VertexRDD
 
@@ -374,7 +254,7 @@ object VertexRDD {
     val vertexPartitions = vPartitioned.mapPartitions(
       iter => Iterator(ShippableVertexPartition(iter)),
       preservesPartitioning = true)
-    new VertexRDD(vertexPartitions)
+    new VertexRDDImpl(vertexPartitions)
   }
 
   /**
@@ -419,7 +299,7 @@ object VertexRDD {
           if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
         Iterator(ShippableVertexPartition(vertexIter, routingTable, 
defaultVal, mergeFunc))
     }
-    new VertexRDD(vertexPartitions)
+    new VertexRDDImpl(vertexPartitions)
   }
 
   /**
@@ -441,10 +321,10 @@ object VertexRDD {
         if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
       Iterator(ShippableVertexPartition(Iterator.empty, routingTable, 
defaultVal))
     }, preservesPartitioning = true)
-    new VertexRDD(vertexPartitions)
+    new VertexRDDImpl(vertexPartitions)
   }
 
-  private def createRoutingTables(
+  private[graphx] def createRoutingTables(
       edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): 
RDD[RoutingTablePartition] = {
     // Determine which vertices each edge partition needs by creating a 
mapping from vid to pid.
     val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(

http://git-wip-us.apache.org/repos/asf/spark/blob/a5ef5811/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
new file mode 100644
index 0000000..4100a85
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graphx._
+
+class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
+    override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
+  extends EdgeRDD[ED, VD](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
+
+  override def setName(_name: String): this.type = {
+    if (partitionsRDD.name != null) {
+      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+    } else {
+      partitionsRDD.setName(_name)
+    }
+    this
+  }
+  setName("EdgeRDD")
+
+  /**
+   * If `partitionsRDD` already has a partitioner, use it. Otherwise assume 
that the
+   * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions 
and create a new
+   * partitioner that allows co-partitioning with `partitionsRDD`.
+   */
+  override val partitioner =
+    
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
+  override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+
+  /**
+   * Persists the edge partitions at the specified storage level, ignoring any 
existing target
+   * storage level.
+   */
+  override def persist(newLevel: StorageLevel): this.type = {
+    partitionsRDD.persist(newLevel)
+    this
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+    partitionsRDD.unpersist(blocking)
+    this
+  }
+
+  /** Persists the edge partitions using `targetStorageLevel`, which defaults 
to MEMORY_ONLY. */
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
+  /** The number of edges in the RDD. */
+  override def count(): Long = {
+    partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+  }
+
+  override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
+    mapEdgePartitions((pid, part) => part.map(f))
+
+  override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => 
part.reverse)
+
+  override def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+  }
+
+  override def innerJoin[ED2: ClassTag, ED3: ClassTag]
+      (other: EdgeRDD[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
+    val ed2Tag = classTag[ED2]
+    val ed3Tag = classTag[ED3]
+    this.withPartitionsRDD[ED3, 
VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+      (thisIter, otherIter) =>
+        val (pid, thisEPart) = thisIter.next()
+        val (_, otherEPart) = otherIter.next()
+        Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, 
ed3Tag)))
+    })
+  }
+
+  override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): 
EdgeRDD[ED2, VD2] = {
+    this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+      if (iter.hasNext) {
+        val (pid, ep) = iter.next()
+        Iterator(Tuple2(pid, f(pid, ep)))
+      } else {
+        Iterator.empty
+      }
+    }, preservesPartitioning = true))
+  }
+
+  override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): 
EdgeRDD[ED2, VD2] = {
+    new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
+  }
+
+  override private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+    new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a5ef5811/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
new file mode 100644
index 0000000..0840562
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graphx._
+
+class VertexRDDImpl[VD] private[graphx] (
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
+  (implicit override protected val vdTag: ClassTag[VD])
+  extends VertexRDD[VD](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
+
+  require(partitionsRDD.partitioner.isDefined)
+
+  override def reindex(): VertexRDD[VD] = 
this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
+
+  override val partitioner = partitionsRDD.partitioner
+
+  override protected def getPreferredLocations(s: Partition): Seq[String] =
+    partitionsRDD.preferredLocations(s)
+
+  override def setName(_name: String): this.type = {
+    if (partitionsRDD.name != null) {
+      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+    } else {
+      partitionsRDD.setName(_name)
+    }
+    this
+  }
+  setName("VertexRDD")
+
+  /**
+   * Persists the vertex partitions at the specified storage level, ignoring 
any existing target
+   * storage level.
+   */
+  override def persist(newLevel: StorageLevel): this.type = {
+    partitionsRDD.persist(newLevel)
+    this
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+    partitionsRDD.unpersist(blocking)
+    this
+  }
+
+  /** Persists the vertex partitions at `targetStorageLevel`, which defaults 
to MEMORY_ONLY. */
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
+  /** The number of vertices in the RDD. */
+  override def count(): Long = {
+    partitionsRDD.map(_.size).reduce(_ + _)
+  }
+
+  override private[graphx] def mapVertexPartitions[VD2: ClassTag](
+      f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
+    : VertexRDD[VD2] = {
+    val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), 
preservesPartitioning = true)
+    this.withPartitionsRDD(newPartitionsRDD)
+  }
+
+  override def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
+    this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+
+  override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): 
VertexRDD[VD2] =
+    this.mapVertexPartitions(_.map(f))
+
+  override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+    val newPartitionsRDD = partitionsRDD.zipPartitions(
+      other.partitionsRDD, preservesPartitioning = true
+    ) { (thisIter, otherIter) =>
+      val thisPart = thisIter.next()
+      val otherPart = otherIter.next()
+      Iterator(thisPart.diff(otherPart))
+    }
+    this.withPartitionsRDD(newPartitionsRDD)
+  }
+
+  override def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): 
VertexRDD[VD3] = {
+    val newPartitionsRDD = partitionsRDD.zipPartitions(
+      other.partitionsRDD, preservesPartitioning = true
+    ) { (thisIter, otherIter) =>
+      val thisPart = thisIter.next()
+      val otherPart = otherIter.next()
+      Iterator(thisPart.leftJoin(otherPart)(f))
+    }
+    this.withPartitionsRDD(newPartitionsRDD)
+  }
+
+  override def leftJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: RDD[(VertexId, VD2)])
+      (f: (VertexId, VD, Option[VD2]) => VD3)
+    : VertexRDD[VD3] = {
+    // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
+    // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
+    other match {
+      case other: VertexRDD[_] =>
+        leftZipJoin(other)(f)
+      case _ =>
+        this.withPartitionsRDD[VD3](
+          partitionsRDD.zipPartitions(
+            other.partitionBy(this.partitioner.get), preservesPartitioning = 
true) {
+            (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
+          }
+        )
+    }
+  }
+
+  override def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
+      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
+    val newPartitionsRDD = partitionsRDD.zipPartitions(
+      other.partitionsRDD, preservesPartitioning = true
+    ) { (thisIter, otherIter) =>
+      val thisPart = thisIter.next()
+      val otherPart = otherIter.next()
+      Iterator(thisPart.innerJoin(otherPart)(f))
+    }
+    this.withPartitionsRDD(newPartitionsRDD)
+  }
+
+  override def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+      (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
+    // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
+    // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
+    other match {
+      case other: VertexRDD[_] =>
+        innerZipJoin(other)(f)
+      case _ =>
+        this.withPartitionsRDD(
+          partitionsRDD.zipPartitions(
+            other.partitionBy(this.partitioner.get), preservesPartitioning = 
true) {
+            (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
+          }
+        )
+    }
+  }
+
+  override def aggregateUsingIndex[VD2: ClassTag](
+      messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): 
VertexRDD[VD2] = {
+    val shuffled = messages.partitionBy(this.partitioner.get)
+    val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, 
msgIter) =>
+      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
+    }
+    this.withPartitionsRDD[VD2](parts)
+  }
+
+  override def reverseRoutingTables(): VertexRDD[VD] =
+    this.mapVertexPartitions(vPart => 
vPart.withRoutingTable(vPart.routingTable.reverse))
+
+  override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+    val routingTables = VertexRDD.createRoutingTables(edges, 
this.partitioner.get)
+    val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
+      (partIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
+        partIter.map(_.withRoutingTable(routingTable))
+    }
+    this.withPartitionsRDD(vertexPartitions)
+  }
+
+  override private[graphx] def withPartitionsRDD[VD2: ClassTag](
+      partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+    new VertexRDDImpl(partitionsRDD, this.targetStorageLevel)
+  }
+
+  override private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+    new VertexRDDImpl(this.partitionsRDD, targetStorageLevel)
+  }
+
+  override private[graphx] def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, 
shipDst)))
+  }
+
+  override private[graphx] def shipVertexIds(): RDD[(PartitionID, 
Array[VertexId])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to