[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12453849 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala --- @@ -17,39 +17,86 @@ package org.apache.spark.graphx.impl -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap /** - * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are - * clustered by src. + * A collection of edges stored in columnar format, along with any vertex attributes referenced. The + * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by + * src. There is an optional active vertex set for filtering computation on the edges. + * + * @tparam ED the edge attribute type + * @tparam VD the vertex attribute type * * @param srcIds the source vertex id of each edge * @param dstIds the destination vertex id of each edge * @param data the attribute associated with each edge * @param index a clustered index on source vertex id - * @tparam ED the edge attribute type. + * @param vertices a map from referenced vertex ids to their corresponding attributes. Must + * contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for + * those vertex ids. The mask is not used. + * @param activeSet an optional active vertex set for filtering computation on the edges */ private[graphx] -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( +class EdgePartition[ +@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag]( --- End diff -- specialize VD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42394655 Build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42480505 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456738 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { vidAndPosition = + new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2) +} + } + + /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */ + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) +: RoutingTablePartition = { +val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) +val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) +val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) +for (msg - iter) { + pid2vid(msg.pid) += msg.vid + srcFlags(msg.pid) += (msg.position 0x1) != 0 + dstFlags(msg.pid) += (msg.position 0x2) != 0 +} + +new RoutingTablePartition(pid2vid.zipWithIndex.map { + case (vids, pid) = (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))) +}) + } + + /** Compact the given vector of Booleans into a BitSet. */ + private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = { +val bitset = new BitSet(flags.size) +var i = 0 +while (i flags.size) { + if
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42487496 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456417 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { vidAndPosition = + new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2) +} + } + + /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */ + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) +: RoutingTablePartition = { +val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) +val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) --- End diff -- @rxin What is the reason to use PrimitiveVector over ArrayBuilder? The latter specializes for all primitives, including Bytes: https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/ArrayBuilder.scala#L40 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42491309 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42753687 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42489551 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14786/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42755314 Thanks everyone - I'm going to pull this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12453687 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala --- @@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag]( */ def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) = VD2): VertexRDD[VD2] = { -val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) +val shuffled = messages.copartitionWithVertices(this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) = - val vertexPartition: VertexPartition[VD] = thisIter.next() - Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc)) + thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) } new VertexRDD[VD2](parts) } + /** + * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding + * [[EdgeRDD]]. + */ + def reverseRoutingTables(): VertexRDD[VD] = --- End diff -- Should be private[graphx] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42480525 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42491311 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14787/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12455215 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala --- @@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex( { (pid, iter) = -val builder = new EdgePartitionBuilder[ED]()(edTag) +val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) iter.foreach { message = val data = message.data builder.add(data._1, data._2, data._3) } val edgePartition = builder.toEdgePartition Iterator((pid, edgePartition)) - }, preservesPartitioning = true).cache()) -GraphImpl(vertices, newEdges) + }, preservesPartitioning = true)) +GraphImpl.fromExistingRDDs(vertices, newEdges) } override def reverse: Graph[VD, ED] = { -val newETable = edges.mapEdgePartitions((pid, part) = part.reverse) -GraphImpl(vertices, newETable) +new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse()) } override def mapVertices[VD2: ClassTag](f: (VertexId, VD) = VD2): Graph[VD2, ED] = { --- End diff -- Introduce private mapVerticesDelta to work around SPARK-1552 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12454278 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala --- @@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) } /** + * Get an iterator over the edge triplets in this partition. + * + * It is safe to keep references to the objects from this iterator. + */ + def tripletIterator( + includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = { --- End diff -- Consider making EdgePartition know its own includeSrc and includeDst, then assert equality here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user jegonzal commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42618546 I went through this PR with Ankur and it looks good to me. There are a few minor changes but those can be moved to a second PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42755276 Merged build finished. All automated tests passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42755277 All automated tests passed. Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14874/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42753772 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42753685 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42753773 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14873/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456334 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { vidAndPosition = + new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2) +} + } + + /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */ + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) +: RoutingTablePartition = { +val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) +val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) --- End diff -- Merge srcFlags and dstFlags into a PrimitiveVector[Byte] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42489547 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42483855 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456034 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala --- @@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} import org.apache.spark.graphx._ /** - * A view of the vertices after they are shipped to the join sites specified in - * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is - * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a - * fresh view is created. - * - * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids - * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for - * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous - * iterations' graphs for best GC performance. See the implementation of - * [[org.apache.spark.graphx.Pregel]] for an example. + * Manages shipping vertex attributes to the edge partitions of an + * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially shipped to construct a + * triplet view with vertex attributes on only one side, and they may be updated. An active vertex + * set may additionally be shipped to the edge partitions. Be careful not to store a reference to + * `edges`, since it may be modified when the attribute shipping level is upgraded. */ private[impl] -class ReplicatedVertexView[VD: ClassTag]( -updatedVerts: VertexRDD[VD], -edges: EdgeRDD[_], -routingTable: RoutingTable, -prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { +class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( +var edges: EdgeRDD[ED, VD], +var hasSrcId: Boolean = false, +var hasDstId: Boolean = false) { /** - * Within each edge partition, create a local map from vid to an index into the attribute - * array. Each map contains a superset of the vertices that it will receive, because it stores - * vids from both the source and destination of edges. It must always include both source and - * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. + * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, which must have the same + * shipping level. */ - private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { -case Some(prevView) = - prevView.localVertexIdMap -case None = - edges.partitionsRDD.mapPartitions(_.map { -case (pid, epart) = - val vidToIndex = new VertexIdToIndexMap - epart.foreach { e = -vidToIndex.add(e.srcId) -vidToIndex.add(e.dstId) - } - (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName(ReplicatedVertexView localVertexIdMap) - } - - private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) - private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false) - private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true) - private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false) - - def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = { -bothAttrs.unpersist(blocking) -srcAttrOnly.unpersist(blocking) -dstAttrOnly.unpersist(blocking) -noAttrs.unpersist(blocking) -// Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it -// without modification -this + def withEdges[VD2: ClassTag, ED2: ClassTag]( + edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = { +new ReplicatedVertexView(edges_, hasSrcId, hasDstId) } - def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = { -(includeSrc, includeDst) match { - case (true, true) = bothAttrs - case (true, false) = srcAttrOnly - case (false, true) = dstAttrOnly - case (false, false) = noAttrs -} + /** + * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to + * match. + */ + def reverse() = { +val newEdges = edges.mapEdgePartitions((pid, part) = part.reverse) +new ReplicatedVertexView(newEdges, hasDstId, hasSrcId) }
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42487477 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456652 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { vidAndPosition = + new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2) +} + } + + /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */ + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) +: RoutingTablePartition = { +val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) +val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) --- End diff -- Use Scala BitSet, which is auto-resizing, avoiding the copy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42483842 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12456744 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { vidAndPosition = + new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2) +} + } + + /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */ + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) +: RoutingTablePartition = { +val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) +val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) +val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) +for (msg - iter) { + pid2vid(msg.pid) += msg.vid + srcFlags(msg.pid) += (msg.position 0x1) != 0 + dstFlags(msg.pid) += (msg.position 0x2) != 0 +} + +new RoutingTablePartition(pid2vid.zipWithIndex.map { + case (vids, pid) = (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))) +}) + } + + /** Compact the given vector of Booleans into a BitSet. */ + private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = { +val bitset = new BitSet(flags.size) +var i = 0 +while (i flags.size) { + if
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42484874 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14783/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42484873 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12454050 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala --- @@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) i += 1 } assert(newData.size == i) -new EdgePartition(srcIds, dstIds, newData, index) +this.withData(newData) + } + + /** + * Construct a new edge partition containing only the edges matching `epred` and where both + * vertices match `vpred`. --- End diff -- Assumes the EdgePartition is upgraded to full triplets --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12453694 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala --- @@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag]( */ def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) = VD2): VertexRDD[VD2] = { -val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) +val shuffled = messages.copartitionWithVertices(this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) = - val vertexPartition: VertexPartition[VD] = thisIter.next() - Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc)) + thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) } new VertexRDD[VD2](parts) } + /** + * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding + * [[EdgeRDD]]. + */ + def reverseRoutingTables(): VertexRDD[VD] = +this.mapVertexPartitions(vPart = vPart.withRoutingTable(vPart.routingTable.reverse)) + + /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ + private[graphx] def shipVertexAttributes( --- End diff -- Should partitionBy before returning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/497 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42753649 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42453595 @ankurdave also - do you mind writing a brief upgrade guide in the GraphX docs with the major user-facing interface changes from 0.9? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42397518 Thanks for the comments! I implemented your suggestions as well as some cleanups I'd discussed with @rxin, then benchmarked it on 16 m2.4xlarge machines with the uk-2007-05 web graph for 10 iterations of PageRank to make sure it didn't introduce a performance regression. Commit | Runtime +/- SEM --- | --- pre-unify-rdds (f96ea3a02f64e6e0fec4dcb1f498c6bcde4af699) | 418 +/- 17.1 s unify-rdds (57202e81cc89056d86d9d9de18173b7553300a97) | 390.5 +/- 13.9 s --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42394658 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14757/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42391060 Build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42391053 Build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42340917 Build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12340674 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala --- @@ -26,6 +26,33 @@ import org.apache.spark.serializer._ import scala.language.existentials private[graphx] +class RoutingTableMessageSerializer extends Serializer with Serializable { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + +override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { +val msg = t.asInstanceOf[RoutingTableMessage] +writeVarLong(msg.vid, optimizePositive = false) +writeUnsignedVarInt(msg.pid) +// TODO: Write only the bottom two bits of msg.position +s.write(msg.position) +this + } +} + +override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { --- End diff -- Add an explicit return type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42340933 Build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12340669 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala --- @@ -26,6 +26,33 @@ import org.apache.spark.serializer._ import scala.language.existentials private[graphx] +class RoutingTableMessageSerializer extends Serializer with Serializable { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + +override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { --- End diff -- Add an explicit return type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12340534 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala --- @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() + + def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr) --- End diff -- Add an explicit return type to this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12341175 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.util.collection.BitSet + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * An class containing additional operations for subclasses of VertexPartitionBase that provide + * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for + * example, [[VertexPartition.VertexPartitionOpsConstructor]]). + */ +private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] : VertexPartitionBase[X]] +(self: T[VD]) +(implicit ev: VertexPartitionBaseOpsConstructor[T]) + extends Logging { + + def withIndex(index: VertexIdToIndexMap): T[VD] + def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2] + def withMask(mask: BitSet): T[VD] + + /** + * Pass each vertex attribute along with the vertex id through a map + * function and retain the original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each vertex id and vertex + * attribute in the RDD + * + * @return a new VertexPartition with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexPartition retains the same index. + */ + def map[VD2: ClassTag](f: (VertexId, VD) = VD2): T[VD2] = { +// Construct a view of the map transformation +val newValues = new Array[VD2](self.capacity) +var i = self.mask.nextSetBit(0) +while (i = 0) { + newValues(i) = f(self.index.getValue(i), self.values(i)) + i = self.mask.nextSetBit(i + 1) +} +this.withValues(newValues) + } + + /** + * Restrict the vertex set to the set of vertices satisfying the given predicate. + * + * @param pred the user defined predicate + * + * @note The vertex set preserves the original index structure which means that the returned + * RDD can be easily joined with the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. + */ + def filter(pred: (VertexId, VD) = Boolean): T[VD] = { +// Allocate the array to store the results into +val newMask = new BitSet(self.capacity) +// Iterate over the active bits in the old mask and evaluate the predicate +var i = self.mask.nextSetBit(0) +while (i = 0) { + if (pred(self.index.getValue(i), self.values(i))) { +newMask.set(i) + } + i = self.mask.nextSetBit(i + 1) +} +this.withMask(newMask) + } + + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. The indices of `this` and `other` must be the same. + */ + def diff(other: T[VD]): T[VD] = { +if (self.index != other.index) { + logWarning(Diffing two VertexPartitions with different indexes is slow.) + diff(createUsingIndex(other.iterator)) +} else { + val newMask = self.mask other.mask + var i = newMask.nextSetBit(0) + while (i = 0) { +if (self.values(i) == other.values(i)) { + newMask.unset(i) +} +i = newMask.nextSetBit(i + 1) + } + ev.toOps(this.withValues(other.values)).withMask(newMask) +} + } + + /** Left outer join another VertexPartition. */ +
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12341257 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.util.collection.{BitSet, PrimitiveVector} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that + * the edge partition references `vid` in the specified `position` (src, dst, or both). +*/ +private[graphx] +class RoutingTableMessage( +var vid: VertexId, +var pid: PartitionID, +var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + override def _1 = vid + override def _2 = (pid, position) + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */ + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { +new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + import scala.language.implicitConversions + + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { +new RoutingTableMessageRDDFunctions(rdd) + } +} + +private[graphx] +object RoutingTablePartition { + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */ + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) +: Iterator[RoutingTableMessage] = { +// Determine which positions each vertex id appears in using a map where the low 2 bits +// represent src and dst +val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] +edgePartition.srcIds.iterator.foreach { srcId = + map.changeValue(srcId, 0x1, (b: Byte) = (b | 0x1).toByte) +} +edgePartition.dstIds.iterator.foreach { dstId = + map.changeValue(dstId, 0x2, (b: Byte) = (b | 0x2).toByte) +} +map.iterator.map { case (vid, position) = new RoutingTableMessage(vid, pid, position) } --- End diff -- map with case is also probably slower than it needs to be --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12341223 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.util.collection.BitSet + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +/** + * An class containing additional operations for subclasses of VertexPartitionBase that provide + * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for + * example, [[VertexPartition.VertexPartitionOpsConstructor]]). + */ +private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] : VertexPartitionBase[X]] +(self: T[VD]) +(implicit ev: VertexPartitionBaseOpsConstructor[T]) + extends Logging { + + def withIndex(index: VertexIdToIndexMap): T[VD] + def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2] + def withMask(mask: BitSet): T[VD] + + /** + * Pass each vertex attribute along with the vertex id through a map + * function and retain the original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each vertex id and vertex + * attribute in the RDD + * + * @return a new VertexPartition with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexPartition retains the same index. + */ + def map[VD2: ClassTag](f: (VertexId, VD) = VD2): T[VD2] = { +// Construct a view of the map transformation +val newValues = new Array[VD2](self.capacity) +var i = self.mask.nextSetBit(0) +while (i = 0) { + newValues(i) = f(self.index.getValue(i), self.values(i)) + i = self.mask.nextSetBit(i + 1) +} +this.withValues(newValues) + } + + /** + * Restrict the vertex set to the set of vertices satisfying the given predicate. + * + * @param pred the user defined predicate + * + * @note The vertex set preserves the original index structure which means that the returned + * RDD can be easily joined with the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. + */ + def filter(pred: (VertexId, VD) = Boolean): T[VD] = { +// Allocate the array to store the results into +val newMask = new BitSet(self.capacity) +// Iterate over the active bits in the old mask and evaluate the predicate +var i = self.mask.nextSetBit(0) +while (i = 0) { + if (pred(self.index.getValue(i), self.values(i))) { +newMask.set(i) + } + i = self.mask.nextSetBit(i + 1) +} +this.withMask(newMask) + } + + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. The indices of `this` and `other` must be the same. + */ + def diff(other: T[VD]): T[VD] = { +if (self.index != other.index) { + logWarning(Diffing two VertexPartitions with different indexes is slow.) + diff(createUsingIndex(other.iterator)) +} else { + val newMask = self.mask other.mask + var i = newMask.nextSetBit(0) + while (i = 0) { +if (self.values(i) == other.values(i)) { + newMask.unset(i) +} +i = newMask.nextSetBit(i + 1) + } + ev.toOps(this.withValues(other.values)).withMask(newMask) +} + } + + /** Left outer join another VertexPartition. */ +
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12341328 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark.util.collection.BitSet + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +private[graphx] object VertexPartitionBase { + /** + * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate + * entries arbitrarily. + */ + def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)]) +: (VertexIdToIndexMap, Array[VD], BitSet) = { +val map = new PrimitiveKeyOpenHashMap[VertexId, VD] +iter.foreach { case (k, v) = + map(k) = v +} +(map.keySet, map._values, map.keySet.getBitSet) + } + + /** + * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate + * entries using `mergeFunc`. + */ + def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) = VD) +: (VertexIdToIndexMap, Array[VD], BitSet) = { +val map = new PrimitiveKeyOpenHashMap[VertexId, VD] +iter.foreach { case (k, v) = --- End diff -- foreach with case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/497#discussion_r12341289 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark.util.collection.BitSet + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap + +private[graphx] object VertexPartitionBase { + /** + * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate + * entries arbitrarily. + */ + def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)]) +: (VertexIdToIndexMap, Array[VD], BitSet) = { +val map = new PrimitiveKeyOpenHashMap[VertexId, VD] +iter.foreach { case (k, v) = --- End diff -- foreach with case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42351663 Build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-42351668 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14726/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41127458 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14358/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user ankurdave commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41124574 cc @rxin @jegonzal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41124613 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41124619 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41124706 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41124707 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14355/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41125076 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41126018 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41126010 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/497#issuecomment-41126494 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14356/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---