[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-16 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12453849
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -17,39 +17,86 @@
 
 package org.apache.spark.graphx.impl
 
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * A collection of edges stored in 3 large columnar arrays (src, dst, 
attribute). The arrays are
- * clustered by src.
+ * A collection of edges stored in columnar format, along with any vertex 
attributes referenced. The
+ * edges are stored in 3 large columnar arrays (src, dst, attribute). The 
arrays are clustered by
+ * src. There is an optional active vertex set for filtering computation 
on the edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the vertex attribute type
  *
  * @param srcIds the source vertex id of each edge
  * @param dstIds the destination vertex id of each edge
  * @param data the attribute associated with each edge
  * @param index a clustered index on source vertex id
- * @tparam ED the edge attribute type.
+ * @param vertices a map from referenced vertex ids to their corresponding 
attributes. Must
+ *   contain all vertex ids from `srcIds` and `dstIds`, though not 
necessarily valid attributes for
+ *   those vertex ids. The mask is not used.
+ * @param activeSet an optional active vertex set for filtering 
computation on the edges
  */
 private[graphx]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, 
Double) ED: ClassTag](
+class EdgePartition[
+@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: 
ClassTag, VD: ClassTag](
--- End diff --

specialize VD


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42394655
  
Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42480505
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456738
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { vidAndPosition =
+  new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+}
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+: RoutingTablePartition = {
+val pid2vid = Array.fill(numEdgePartitions)(new 
PrimitiveVector[VertexId])
+val srcFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
+val dstFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
+for (msg - iter) {
+  pid2vid(msg.pid) += msg.vid
+  srcFlags(msg.pid) += (msg.position  0x1) != 0
+  dstFlags(msg.pid) += (msg.position  0x2) != 0
+}
+
+new RoutingTablePartition(pid2vid.zipWithIndex.map {
+  case (vids, pid) = (vids.trim().array, toBitSet(srcFlags(pid)), 
toBitSet(dstFlags(pid)))
+})
+  }
+
+  /** Compact the given vector of Booleans into a BitSet. */
+  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
+val bitset = new BitSet(flags.size)
+var i = 0
+while (i  flags.size) {
+  if 

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42487496
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456417
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { vidAndPosition =
+  new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+}
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+: RoutingTablePartition = {
+val pid2vid = Array.fill(numEdgePartitions)(new 
PrimitiveVector[VertexId])
+val srcFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
--- End diff --

@rxin What is the reason to use PrimitiveVector over ArrayBuilder? The 
latter specializes for all primitives, including Bytes: 
https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/ArrayBuilder.scala#L40


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42491309
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42753687
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42489551
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14786/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42755314
  
Thanks everyone - I'm going to pull this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12453687
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala ---
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
*/
   def aggregateUsingIndex[VD2: ClassTag](
   messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) = VD2): 
VertexRDD[VD2] = {
-val shuffled = MsgRDDFunctions.partitionForAggregation(messages, 
this.partitioner.get)
+val shuffled = messages.copartitionWithVertices(this.partitioner.get)
 val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, 
msgIter) =
-  val vertexPartition: VertexPartition[VD] = thisIter.next()
-  Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+  thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
 }
 new VertexRDD[VD2](parts)
   }
 
+  /**
+   * Returns a new `VertexRDD` reflecting a reversal of all edge 
directions in the corresponding
+   * [[EdgeRDD]].
+   */
+  def reverseRoutingTables(): VertexRDD[VD] =
--- End diff --

Should be private[graphx]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42480525
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42491311
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14787/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12455215
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala ---
@@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 }
   .partitionBy(new HashPartitioner(numPartitions))
   .mapPartitionsWithIndex( { (pid, iter) =
-val builder = new EdgePartitionBuilder[ED]()(edTag)
+val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
 iter.foreach { message =
   val data = message.data
   builder.add(data._1, data._2, data._3)
 }
 val edgePartition = builder.toEdgePartition
 Iterator((pid, edgePartition))
-  }, preservesPartitioning = true).cache())
-GraphImpl(vertices, newEdges)
+  }, preservesPartitioning = true))
+GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
-val newETable = edges.mapEdgePartitions((pid, part) = part.reverse)
-GraphImpl(vertices, newETable)
+new GraphImpl(vertices.reverseRoutingTables(), 
replicatedVertexView.reverse())
   }
 
   override def mapVertices[VD2: ClassTag](f: (VertexId, VD) = VD2): 
Graph[VD2, ED] = {
--- End diff --

Introduce private mapVerticesDelta to work around SPARK-1552


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12454278
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, 
Byte, Long, Float, Double)
   }
 
   /**
+   * Get an iterator over the edge triplets in this partition.
+   *
+   * It is safe to keep references to the objects from this iterator.
+   */
+  def tripletIterator(
+  includeSrc: Boolean = true, includeDst: Boolean = true): 
Iterator[EdgeTriplet[VD, ED]] = {
--- End diff --

Consider making EdgePartition know its own includeSrc and includeDst, then 
assert equality here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread jegonzal
Github user jegonzal commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42618546
  
I went through this PR with Ankur and it looks good to me.  There are a few 
minor changes but those can be moved to a second PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42755276
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42755277
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14874/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42753772
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42753685
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42753773
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14873/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-12 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456334
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { vidAndPosition =
+  new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+}
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+: RoutingTablePartition = {
+val pid2vid = Array.fill(numEdgePartitions)(new 
PrimitiveVector[VertexId])
+val srcFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
--- End diff --

Merge srcFlags and dstFlags into a PrimitiveVector[Byte]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42489547
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42483855
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456034
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala 
---
@@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
 
 import org.apache.spark.graphx._
 
 /**
- * A view of the vertices after they are shipped to the join sites 
specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. 
If `prevViewOpt` is
- * specified, `updatedVerts` are treated as incremental updates to the 
previous view. Otherwise, a
- * fresh view is created.
- *
- * The view is always cached (i.e., once it is evaluated, it remains 
materialized). This avoids
- * constructing it twice if the user calls graph.triplets followed by 
graph.mapReduceTriplets, for
- * example. However, it means iterative algorithms must manually call 
`Graph.unpersist` on previous
- * iterations' graphs for best GC performance. See the implementation of
- * [[org.apache.spark.graphx.Pregel]] for an example.
+ * Manages shipping vertex attributes to the edge partitions of an
+ * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially 
shipped to construct a
+ * triplet view with vertex attributes on only one side, and they may be 
updated. An active vertex
+ * set may additionally be shipped to the edge partitions. Be careful not 
to store a reference to
+ * `edges`, since it may be modified when the attribute shipping level is 
upgraded.
  */
 private[impl]
-class ReplicatedVertexView[VD: ClassTag](
-updatedVerts: VertexRDD[VD],
-edges: EdgeRDD[_],
-routingTable: RoutingTable,
-prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
+class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
+var edges: EdgeRDD[ED, VD],
+var hasSrcId: Boolean = false,
+var hasDstId: Boolean = false) {
 
   /**
-   * Within each edge partition, create a local map from vid to an index 
into the attribute
-   * array. Each map contains a superset of the vertices that it will 
receive, because it stores
-   * vids from both the source and destination of edges. It must always 
include both source and
-   * destination vids because some operations, such as 
GraphImpl.mapReduceTriplets, rely on this.
+   * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, 
which must have the same
+   * shipping level.
*/
-  private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = 
prevViewOpt match {
-case Some(prevView) =
-  prevView.localVertexIdMap
-case None =
-  edges.partitionsRDD.mapPartitions(_.map {
-case (pid, epart) =
-  val vidToIndex = new VertexIdToIndexMap
-  epart.foreach { e =
-vidToIndex.add(e.srcId)
-vidToIndex.add(e.dstId)
-  }
-  (pid, vidToIndex)
-  }, preservesPartitioning = 
true).cache().setName(ReplicatedVertexView localVertexIdMap)
-  }
-
-  private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = 
create(true, true)
-  private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = 
create(true, false)
-  private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = 
create(false, true)
-  private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = 
create(false, false)
-
-  def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
-bothAttrs.unpersist(blocking)
-srcAttrOnly.unpersist(blocking)
-dstAttrOnly.unpersist(blocking)
-noAttrs.unpersist(blocking)
-// Don't unpersist localVertexIdMap because a future 
ReplicatedVertexView may be using it
-// without modification
-this
+  def withEdges[VD2: ClassTag, ED2: ClassTag](
+  edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
   }
 
-  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, 
VertexPartition[VD])] = {
-(includeSrc, includeDst) match {
-  case (true, true) = bothAttrs
-  case (true, false) = srcAttrOnly
-  case (false, true) = dstAttrOnly
-  case (false, false) = noAttrs
-}
+  /**
+   * Return a new `ReplicatedVertexView` where edges are reversed and 
shipping levels are swapped to
+   * match.
+   */
+  def reverse() = {
+val newEdges = edges.mapEdgePartitions((pid, part) = part.reverse)
+new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
   }
   

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42487477
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456652
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { vidAndPosition =
+  new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+}
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+: RoutingTablePartition = {
+val pid2vid = Array.fill(numEdgePartitions)(new 
PrimitiveVector[VertexId])
+val srcFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
--- End diff --

Use Scala BitSet, which is auto-resizing, avoiding the copy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42483842
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-11 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12456744
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { vidAndPosition =
+  new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+}
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+: RoutingTablePartition = {
+val pid2vid = Array.fill(numEdgePartitions)(new 
PrimitiveVector[VertexId])
+val srcFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
+val dstFlags = Array.fill(numEdgePartitions)(new 
PrimitiveVector[Boolean])
+for (msg - iter) {
+  pid2vid(msg.pid) += msg.vid
+  srcFlags(msg.pid) += (msg.position  0x1) != 0
+  dstFlags(msg.pid) += (msg.position  0x2) != 0
+}
+
+new RoutingTablePartition(pid2vid.zipWithIndex.map {
+  case (vids, pid) = (vids.trim().array, toBitSet(srcFlags(pid)), 
toBitSet(dstFlags(pid)))
+})
+  }
+
+  /** Compact the given vector of Booleans into a BitSet. */
+  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
+val bitset = new BitSet(flags.size)
+var i = 0
+while (i  flags.size) {
+  if 

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42484874
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14783/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42484873
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12454050
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, 
Byte, Long, Float, Double)
   i += 1
 }
 assert(newData.size == i)
-new EdgePartition(srcIds, dstIds, newData, index)
+this.withData(newData)
+  }
+
+  /**
+   * Construct a new edge partition containing only the edges matching 
`epred` and where both
+   * vertices match `vpred`.
--- End diff --

Assumes the EdgePartition is upgraded to full triplets


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12453694
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala ---
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
*/
   def aggregateUsingIndex[VD2: ClassTag](
   messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) = VD2): 
VertexRDD[VD2] = {
-val shuffled = MsgRDDFunctions.partitionForAggregation(messages, 
this.partitioner.get)
+val shuffled = messages.copartitionWithVertices(this.partitioner.get)
 val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, 
msgIter) =
-  val vertexPartition: VertexPartition[VD] = thisIter.next()
-  Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+  thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
 }
 new VertexRDD[VD2](parts)
   }
 
+  /**
+   * Returns a new `VertexRDD` reflecting a reversal of all edge 
directions in the corresponding
+   * [[EdgeRDD]].
+   */
+  def reverseRoutingTables(): VertexRDD[VD] =
+this.mapVertexPartitions(vPart = 
vPart.withRoutingTable(vPart.routingTable.reverse))
+
+  /** Generates an RDD of vertex attributes suitable for shipping to the 
edge partitions. */
+  private[graphx] def shipVertexAttributes(
--- End diff --

Should partitionBy before returning


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/497


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42753649
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-10 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42453595
  
@ankurdave also - do you mind writing a brief upgrade guide in the GraphX 
docs with the major user-facing interface changes from 0.9?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-07 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42397518
  
Thanks for the comments! I implemented your suggestions as well as some 
cleanups I'd discussed with @rxin, then benchmarked it on 16 m2.4xlarge 
machines with the uk-2007-05 web graph for 10 iterations of PageRank to make 
sure it didn't introduce a performance regression.

Commit | Runtime +/- SEM
--- | ---
pre-unify-rdds (f96ea3a02f64e6e0fec4dcb1f498c6bcde4af699) | 418 +/- 17.1 s
unify-rdds (57202e81cc89056d86d9d9de18173b7553300a97) | 390.5 +/- 13.9 s



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42394658
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14757/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42391060
  
Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42391053
  
 Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42340917
  
 Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12340674
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala ---
@@ -26,6 +26,33 @@ import org.apache.spark.serializer._
 import scala.language.existentials
 
 private[graphx]
+class RoutingTableMessageSerializer extends Serializer with Serializable {
+  override def newInstance(): SerializerInstance = new 
ShuffleSerializerInstance {
+
+override def serializeStream(s: OutputStream) = new 
ShuffleSerializationStream(s) {
+  def writeObject[T](t: T) = {
+val msg = t.asInstanceOf[RoutingTableMessage]
+writeVarLong(msg.vid, optimizePositive = false)
+writeUnsignedVarInt(msg.pid)
+// TODO: Write only the bottom two bits of msg.position
+s.write(msg.position)
+this
+  }
+}
+
+override def deserializeStream(s: InputStream) = new 
ShuffleDeserializationStream(s) {
--- End diff --

Add an explicit return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42340933
  
Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12340669
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala ---
@@ -26,6 +26,33 @@ import org.apache.spark.serializer._
 import scala.language.existentials
 
 private[graphx]
+class RoutingTableMessageSerializer extends Serializer with Serializable {
+  override def newInstance(): SerializerInstance = new 
ShuffleSerializerInstance {
+
+override def serializeStream(s: OutputStream) = new 
ShuffleSerializationStream(s) {
--- End diff --

Add an explicit return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12340534
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala 
---
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
 if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
   override def toString = ((srcId, srcAttr), (dstId, dstAttr), 
attr).toString()
+
+  def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
--- End diff --

Add an explicit return type to this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12341175
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala 
---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * An class containing additional operations for subclasses of 
VertexPartitionBase that provide
+ * implicit evidence of membership in the 
`VertexPartitionBaseOpsConstructor` typeclass (for
+ * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] 
: VertexPartitionBase[X]]
+(self: T[VD])
+(implicit ev: VertexPartitionBaseOpsConstructor[T])
+  extends Logging {
+
+  def withIndex(index: VertexIdToIndexMap): T[VD]
+  def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
+  def withMask(mask: BitSet): T[VD]
+
+  /**
+   * Pass each vertex attribute along with the vertex id through a map
+   * function and retain the original RDD's partitioning and index.
+   *
+   * @tparam VD2 the type returned by the map function
+   *
+   * @param f the function applied to each vertex id and vertex
+   * attribute in the RDD
+   *
+   * @return a new VertexPartition with values obtained by applying `f` to
+   * each of the entries in the original VertexRDD.  The resulting
+   * VertexPartition retains the same index.
+   */
+  def map[VD2: ClassTag](f: (VertexId, VD) = VD2): T[VD2] = {
+// Construct a view of the map transformation
+val newValues = new Array[VD2](self.capacity)
+var i = self.mask.nextSetBit(0)
+while (i = 0) {
+  newValues(i) = f(self.index.getValue(i), self.values(i))
+  i = self.mask.nextSetBit(i + 1)
+}
+this.withValues(newValues)
+  }
+
+  /**
+   * Restrict the vertex set to the set of vertices satisfying the given 
predicate.
+   *
+   * @param pred the user defined predicate
+   *
+   * @note The vertex set preserves the original index structure which 
means that the returned
+   *   RDD can be easily joined with the original vertex-set. 
Furthermore, the filter only
+   *   modifies the bitmap index and so no new values are allocated.
+   */
+  def filter(pred: (VertexId, VD) = Boolean): T[VD] = {
+// Allocate the array to store the results into
+val newMask = new BitSet(self.capacity)
+// Iterate over the active bits in the old mask and evaluate the 
predicate
+var i = self.mask.nextSetBit(0)
+while (i = 0) {
+  if (pred(self.index.getValue(i), self.values(i))) {
+newMask.set(i)
+  }
+  i = self.mask.nextSetBit(i + 1)
+}
+this.withMask(newMask)
+  }
+
+  /**
+   * Hides vertices that are the same between this and other. For vertices 
that are different, keeps
+   * the values from `other`. The indices of `this` and `other` must be 
the same.
+   */
+  def diff(other: T[VD]): T[VD] = {
+if (self.index != other.index) {
+  logWarning(Diffing two VertexPartitions with different indexes is 
slow.)
+  diff(createUsingIndex(other.iterator))
+} else {
+  val newMask = self.mask  other.mask
+  var i = newMask.nextSetBit(0)
+  while (i = 0) {
+if (self.values(i) == other.values(i)) {
+  newMask.unset(i)
+}
+i = newMask.nextSetBit(i + 1)
+  }
+  ev.toOps(this.withValues(other.values)).withMask(newMask)
+}
+  }
+
+  /** Left outer join another VertexPartition. */
+  

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12341257
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala 
---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition 
containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, 
dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+var vid: VertexId,
+var pid: PartitionID,
+var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = 
that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with 
the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): 
RDD[RoutingTableMessage] = {
+new ShuffledRDD[VertexId, (PartitionID, Byte), 
RoutingTableMessage](self, partitioner)
+  .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: 
RDD[RoutingTableMessage]) = {
+new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in 
`edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: 
EdgePartition[_, _])
+: Iterator[RoutingTableMessage] = {
+// Determine which positions each vertex id appears in using a map 
where the low 2 bits
+// represent src and dst
+val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+edgePartition.srcIds.iterator.foreach { srcId =
+  map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte)
+}
+edgePartition.dstIds.iterator.foreach { dstId =
+  map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte)
+}
+map.iterator.map { case (vid, position) = new 
RoutingTableMessage(vid, pid, position) }
--- End diff --

map with case is also probably slower than it needs to be


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12341223
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala 
---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * An class containing additional operations for subclasses of 
VertexPartitionBase that provide
+ * implicit evidence of membership in the 
`VertexPartitionBaseOpsConstructor` typeclass (for
+ * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] 
: VertexPartitionBase[X]]
+(self: T[VD])
+(implicit ev: VertexPartitionBaseOpsConstructor[T])
+  extends Logging {
+
+  def withIndex(index: VertexIdToIndexMap): T[VD]
+  def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
+  def withMask(mask: BitSet): T[VD]
+
+  /**
+   * Pass each vertex attribute along with the vertex id through a map
+   * function and retain the original RDD's partitioning and index.
+   *
+   * @tparam VD2 the type returned by the map function
+   *
+   * @param f the function applied to each vertex id and vertex
+   * attribute in the RDD
+   *
+   * @return a new VertexPartition with values obtained by applying `f` to
+   * each of the entries in the original VertexRDD.  The resulting
+   * VertexPartition retains the same index.
+   */
+  def map[VD2: ClassTag](f: (VertexId, VD) = VD2): T[VD2] = {
+// Construct a view of the map transformation
+val newValues = new Array[VD2](self.capacity)
+var i = self.mask.nextSetBit(0)
+while (i = 0) {
+  newValues(i) = f(self.index.getValue(i), self.values(i))
+  i = self.mask.nextSetBit(i + 1)
+}
+this.withValues(newValues)
+  }
+
+  /**
+   * Restrict the vertex set to the set of vertices satisfying the given 
predicate.
+   *
+   * @param pred the user defined predicate
+   *
+   * @note The vertex set preserves the original index structure which 
means that the returned
+   *   RDD can be easily joined with the original vertex-set. 
Furthermore, the filter only
+   *   modifies the bitmap index and so no new values are allocated.
+   */
+  def filter(pred: (VertexId, VD) = Boolean): T[VD] = {
+// Allocate the array to store the results into
+val newMask = new BitSet(self.capacity)
+// Iterate over the active bits in the old mask and evaluate the 
predicate
+var i = self.mask.nextSetBit(0)
+while (i = 0) {
+  if (pred(self.index.getValue(i), self.values(i))) {
+newMask.set(i)
+  }
+  i = self.mask.nextSetBit(i + 1)
+}
+this.withMask(newMask)
+  }
+
+  /**
+   * Hides vertices that are the same between this and other. For vertices 
that are different, keeps
+   * the values from `other`. The indices of `this` and `other` must be 
the same.
+   */
+  def diff(other: T[VD]): T[VD] = {
+if (self.index != other.index) {
+  logWarning(Diffing two VertexPartitions with different indexes is 
slow.)
+  diff(createUsingIndex(other.iterator))
+} else {
+  val newMask = self.mask  other.mask
+  var i = newMask.nextSetBit(0)
+  while (i = 0) {
+if (self.values(i) == other.values(i)) {
+  newMask.unset(i)
+}
+i = newMask.nextSetBit(i + 1)
+  }
+  ev.toOps(this.withValues(other.values)).withMask(newMask)
+}
+  }
+
+  /** Left outer join another VertexPartition. */
+  

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12341328
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+private[graphx] object VertexPartitionBase {
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given 
vertices, merging duplicate
+   * entries arbitrarily.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+: (VertexIdToIndexMap, Array[VD], BitSet) = {
+val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+iter.foreach { case (k, v) =
+  map(k) = v
+}
+(map.keySet, map._values, map.keySet.getBitSet)
+  }
+
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given 
vertices, merging duplicate
+   * entries using `mergeFunc`.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: 
(VD, VD) = VD)
+: (VertexIdToIndexMap, Array[VD], BitSet) = {
+val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+iter.foreach { case (k, v) =
--- End diff --

foreach with case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/497#discussion_r12341289
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+private[graphx] object VertexPartitionBase {
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given 
vertices, merging duplicate
+   * entries arbitrarily.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+: (VertexIdToIndexMap, Array[VD], BitSet) = {
+val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+iter.foreach { case (k, v) =
--- End diff --

foreach with case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42351663
  
Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-42351668
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14726/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41127458
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14358/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41124574
  
cc @rxin @jegonzal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41124613
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41124619
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41124706
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41124707
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14355/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41125076
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41126018
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41126010
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

2014-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/497#issuecomment-41126494
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14356/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---