[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-12 Thread jegonzal
Github user jegonzal commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20257677
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx;
+
+import java.io.Serializable;
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+public class TripletFields implements Serializable {
+  public final boolean useSrc;
+  public final boolean useDst;
+  public final boolean useEdge;
+
+  public TripletFields() {
+this(true, true, true);
+  }
+
+  public TripletFields(boolean useSrc, boolean useDst, boolean useEdge) {
+this.useSrc = useSrc;
+this.useDst = useDst;
+this.useEdge = useEdge;
+  }
+
+  public static final TripletFields None = new TripletFields(false, false, 
false);
--- End diff --

How about we just keep:

```
EdgeOnly, SrcAndEdge, DstAndEdge, All
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-12 Thread jegonzal
Github user jegonzal commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20243545
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx;
+
+import java.io.Serializable;
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+public class TripletFields implements Serializable {
+  public final boolean useSrc;
+  public final boolean useDst;
+  public final boolean useEdge;
+
+  public TripletFields() {
+this(true, true, true);
+  }
+
+  public TripletFields(boolean useSrc, boolean useDst, boolean useEdge) {
+this.useSrc = useSrc;
+this.useDst = useDst;
+this.useEdge = useEdge;
+  }
+
+  public static final TripletFields None = new TripletFields(false, false, 
false);
--- End diff --

Hmm, I agree though I used many of them in the `GraphOps` code and decided 
maybe it would make sense to go ahead and be exhaustive.  I think we could cut 
a few.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3100


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204979
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx;
+
+import java.io.Serializable;
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+public class TripletFields implements Serializable {
+  public final boolean useSrc;
+  public final boolean useDst;
+  public final boolean useEdge;
+
+  public TripletFields() {
+this(true, true, true);
+  }
+
+  public TripletFields(boolean useSrc, boolean useDst, boolean useEdge) {
+this.useSrc = useSrc;
+this.useDst = useDst;
+this.useEdge = useEdge;
+  }
+
+  public static final TripletFields None = new TripletFields(false, false, 
false);
--- End diff --

I find having so many triplet fields confusing. @jegonzal do we really need 
this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-62680888
  
Ok I'm merging this. I will make a small patch to clean up some stuff. 
Ankur - can you add the num change for srcMustBeActive, dstMustBeActive, 
maySatisfyEither, and remove type alias for vid and pid? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204491
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -280,55 +344,197 @@ class EdgePartition[
* It is safe to keep references to the objects from this iterator.
*/
   def tripletIterator(
-  includeSrc: Boolean = true, includeDst: Boolean = true): 
Iterator[EdgeTriplet[VD, ED]] = {
-new EdgeTripletIterator(this, includeSrc, includeDst)
+  includeSrc: Boolean = true, includeDst: Boolean = true)
+  : Iterator[EdgeTriplet[VD, ED]] = new Iterator[EdgeTriplet[VD, ED]] {
+private[this] var pos = 0
+
+override def hasNext: Boolean = pos < EdgePartition.this.size
+
+override def next() = {
+  val triplet = new EdgeTriplet[VD, ED]
+  val localSrcId = localSrcIds(pos)
+  val localDstId = localDstIds(pos)
+  triplet.srcId = local2global(localSrcId)
+  triplet.dstId = local2global(localDstId)
+  if (includeSrc) {
+triplet.srcAttr = vertexAttrs(localSrcId)
+  }
+  if (includeDst) {
+triplet.dstAttr = vertexAttrs(localDstId)
+  }
+  triplet.attr = data(pos)
+  pos += 1
+  triplet
+}
   }
 
   /**
-   * Upgrade the given edge iterator into a triplet iterator.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
+   * all edges sequentially.
+   *
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param tripletFields which triplet fields `sendMsg` uses
+   * @param srcMustBeActive if true, edges will only be considered if 
their source vertex is in the
+   *   active set
+   * @param dstMustBeActive if true, edges will only be considered if 
their destination vertex is in
+   *   the active set
+   * @param maySatisfyEither if true, only one vertex need be in the 
active set for an edge to be
+   *   considered
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  def upgradeIterator(
-  edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
-: Iterator[EdgeTriplet[VD, ED]] = {
-new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  def aggregateMessagesEdgeScan[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  srcMustBeActive: Boolean,
--- End diff --

note: srcMustBeActive, dstMustBeActive, maySatisfyEither is confusing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204354
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -21,63 +21,93 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.BitSet
 
 /**
- * A collection of edges stored in columnar format, along with any vertex 
attributes referenced. The
- * edges are stored in 3 large columnar arrays (src, dst, attribute). The 
arrays are clustered by
- * src. There is an optional active vertex set for filtering computation 
on the edges.
+ * A collection of edges, along with referenced vertex attributes and an 
optional active vertex set
+ * for filtering computation on the edges.
+ *
+ * The edges are stored in columnar format in `localSrcIds`, 
`localDstIds`, and `data`. All
+ * referenced global vertex ids are mapped to a compact set of local 
vertex ids according to the
+ * `global2local` map. Each local vertex id is a valid index into 
`vertexAttrs`, which stores the
+ * corresponding vertex attribute, and `local2global`, which stores the 
reverse mapping to global
+ * vertex id. The global vertex ids that are active are optionally stored 
in `activeSet`.
+ *
+ * The edges are clustered by source vertex id, and the mapping from 
global vertex id to the index
+ * of the corresponding edge cluster is stored in `index`.
  *
  * @tparam ED the edge attribute type
  * @tparam VD the vertex attribute type
  *
- * @param srcIds the source vertex id of each edge
- * @param dstIds the destination vertex id of each edge
+ * @param localSrcIds the local source vertex id of each edge as an index 
into `local2global` and
+ *   `vertexAttrs`
+ * @param localDstIds the local destination vertex id of each edge as an 
index into `local2global`
+ *   and `vertexAttrs`
  * @param data the attribute associated with each edge
- * @param index a clustered index on source vertex id
- * @param vertices a map from referenced vertex ids to their corresponding 
attributes. Must
- *   contain all vertex ids from `srcIds` and `dstIds`, though not 
necessarily valid attributes for
- *   those vertex ids. The mask is not used.
+ * @param index a clustered index on source vertex id as a map from each 
global source vertex id to
+ *   the offset in the edge arrays where the cluster for that vertex id 
begins
+ * @param global2local a map from referenced vertex ids to local ids which 
index into vertexAttrs
+ * @param local2global an array of global vertex ids where the offsets are 
local vertex ids
+ * @param vertexAttrs an array of vertex attributes where the offsets are 
local vertex ids
  * @param activeSet an optional active vertex set for filtering 
computation on the edges
  */
 private[graphx]
 class EdgePartition[
 @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: 
ClassTag, VD: ClassTag](
-val srcIds: Array[VertexId] = null,
-val dstIds: Array[VertexId] = null,
-val data: Array[ED] = null,
-val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
-val vertices: VertexPartition[VD] = null,
-val activeSet: Option[VertexSet] = None
-  ) extends Serializable {
+localSrcIds: Array[Int],
+localDstIds: Array[Int],
+data: Array[ED],
+index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+local2global: Array[VertexId],
+vertexAttrs: Array[VD],
+activeSet: Option[VertexSet])
+  extends Serializable {
 
-  /** Return a new `EdgePartition` with the specified edge data. */
-  def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] 
= {
-new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
-  }
+  private def this() = this(null, null, null, null, null, null, null, null)
 
-  /** Return a new `EdgePartition` with the specified vertex partition. */
-  def withVertices[VD2: ClassTag](
-  vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
-new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+  /** Return a new `EdgePartition` with the specified edge data. */
--- End diff --

can u explain data is indexed by local vid


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204348
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -21,63 +21,93 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.BitSet
 
 /**
- * A collection of edges stored in columnar format, along with any vertex 
attributes referenced. The
- * edges are stored in 3 large columnar arrays (src, dst, attribute). The 
arrays are clustered by
- * src. There is an optional active vertex set for filtering computation 
on the edges.
+ * A collection of edges, along with referenced vertex attributes and an 
optional active vertex set
+ * for filtering computation on the edges.
+ *
+ * The edges are stored in columnar format in `localSrcIds`, 
`localDstIds`, and `data`. All
+ * referenced global vertex ids are mapped to a compact set of local 
vertex ids according to the
+ * `global2local` map. Each local vertex id is a valid index into 
`vertexAttrs`, which stores the
+ * corresponding vertex attribute, and `local2global`, which stores the 
reverse mapping to global
+ * vertex id. The global vertex ids that are active are optionally stored 
in `activeSet`.
+ *
+ * The edges are clustered by source vertex id, and the mapping from 
global vertex id to the index
+ * of the corresponding edge cluster is stored in `index`.
  *
  * @tparam ED the edge attribute type
  * @tparam VD the vertex attribute type
  *
- * @param srcIds the source vertex id of each edge
- * @param dstIds the destination vertex id of each edge
+ * @param localSrcIds the local source vertex id of each edge as an index 
into `local2global` and
+ *   `vertexAttrs`
+ * @param localDstIds the local destination vertex id of each edge as an 
index into `local2global`
+ *   and `vertexAttrs`
  * @param data the attribute associated with each edge
- * @param index a clustered index on source vertex id
- * @param vertices a map from referenced vertex ids to their corresponding 
attributes. Must
- *   contain all vertex ids from `srcIds` and `dstIds`, though not 
necessarily valid attributes for
- *   those vertex ids. The mask is not used.
+ * @param index a clustered index on source vertex id as a map from each 
global source vertex id to
+ *   the offset in the edge arrays where the cluster for that vertex id 
begins
+ * @param global2local a map from referenced vertex ids to local ids which 
index into vertexAttrs
+ * @param local2global an array of global vertex ids where the offsets are 
local vertex ids
+ * @param vertexAttrs an array of vertex attributes where the offsets are 
local vertex ids
  * @param activeSet an optional active vertex set for filtering 
computation on the edges
  */
 private[graphx]
 class EdgePartition[
 @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: 
ClassTag, VD: ClassTag](
-val srcIds: Array[VertexId] = null,
-val dstIds: Array[VertexId] = null,
-val data: Array[ED] = null,
-val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
-val vertices: VertexPartition[VD] = null,
-val activeSet: Option[VertexSet] = None
-  ) extends Serializable {
+localSrcIds: Array[Int],
+localDstIds: Array[Int],
+data: Array[ED],
+index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+local2global: Array[VertexId],
+vertexAttrs: Array[VD],
+activeSet: Option[VertexSet])
+  extends Serializable {
 
-  /** Return a new `EdgePartition` with the specified edge data. */
-  def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] 
= {
-new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
-  }
+  private def this() = this(null, null, null, null, null, null, null, null)
--- End diff --

note that this is used for serialization.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204163
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---
@@ -207,8 +207,39 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] 
protected () extends Serializab
* }}}
*
*/
-  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): 
Graph[VD, ED2] = {
-mapTriplets((pid, iter) => iter.map(map))
+  def mapTriplets[ED2: ClassTag](
+  map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
--- End diff --

note: this can fit on one line now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20204139
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents an edge along with its neighboring vertices and allows 
sending messages along the
+ * edge. Used in [[Graph#aggregateMessages]].
+ */
+abstract class EdgeContext[VD, ED, A] {
+  /** The vertex id of the edge's source vertex. */
+  def srcId: VertexId
--- End diff --

didn't we decide we want to get rid of VertexId when we expose it to the 
user?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-62678327
  
  [Test build #23248 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23248/consoleFull)
 for   PR 3100 at commit 
[`f5b65d0`](https://github.com/apache/spark/commit/f5b65d0695594781324c3ddb9c41ec53a476ac95).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-62678332
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23248/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-62673015
  
  [Test build #23248 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23248/consoleFull)
 for   PR 3100 at commit 
[`f5b65d0`](https://github.com/apache/spark/commit/f5b65d0695594781324c3ddb9c41ec53a476ac95).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20130355
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -285,50 +337,126 @@ class EdgePartition[
   }
 
   /**
-   * Upgrade the given edge iterator into a triplet iterator.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
+   * all edges sequentially and filtering them with `idPred`.
+   *
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param sendMsgUsesSrcAttr whether or not `mapFunc` uses the edge's 
source vertex attribute
+   * @param sendMsgUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
+   * @param idPred a predicate to filter edges based on their source and 
destination vertex ids
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  def upgradeIterator(
-  edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
-: Iterator[EdgeTriplet[VD, ED]] = {
-new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  def aggregateMessages[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = new BitSet(vertexAttrs.length)
+
+var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, 
bitset)
+var i = 0
+while (i < size) {
+  val localSrcId = localSrcIds(i)
+  val srcId = local2global(localSrcId)
+  val localDstId = localDstIds(i)
+  val dstId = local2global(localDstId)
+  if (idPred(srcId, dstId)) {
+ctx.localSrcId = localSrcId
+ctx.localDstId = localDstId
+ctx.srcId = srcId
+ctx.dstId = dstId
+ctx.attr = data(i)
+if (tripletFields.useSrc) { ctx.srcAttr = vertexAttrs(localSrcId) }
+if (tripletFields.useDst) { ctx.dstAttr = vertexAttrs(localDstId) }
+sendMsg(ctx)
+  }
+  i += 1
+}
+
+bitset.iterator.map { localId => (local2global(localId), 
aggregates(localId)) }
   }
 
   /**
-   * Get an iterator over the edges in this partition whose source vertex 
ids match srcIdPred. The
-   * iterator is generated using an index scan, so it is efficient at 
skipping edges that don't
-   * match srcIdPred.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by
+   * filtering the source vertex index with `srcIdPred`, then scanning 
edge clusters and filtering
+   * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an 
edge to run.
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
-   */
-  def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
-index.iterator.filter(kv => 
srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
-
-  /**
-   * Get an iterator over the cluster of edges in this partition with 
source vertex id `srcId`. The
-   * cluster must start at position `index`.
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param srcIdPred a predicate to filter edges based on their source 
vertex id
+   * @param dstIdPred a predicate to filter edges based on their 
destination vertex id
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
-private[this] val edge = new Edge[ED]
-private[this] var pos = index
+  def aggregateMessagesWithIndex[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  srcIdPred: VertexId => Boolean,
+  dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = n

[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20130288
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -285,50 +337,126 @@ class EdgePartition[
   }
 
   /**
-   * Upgrade the given edge iterator into a triplet iterator.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
+   * all edges sequentially and filtering them with `idPred`.
+   *
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param sendMsgUsesSrcAttr whether or not `mapFunc` uses the edge's 
source vertex attribute
+   * @param sendMsgUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
+   * @param idPred a predicate to filter edges based on their source and 
destination vertex ids
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  def upgradeIterator(
-  edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
-: Iterator[EdgeTriplet[VD, ED]] = {
-new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  def aggregateMessages[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = new BitSet(vertexAttrs.length)
+
+var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, 
bitset)
+var i = 0
+while (i < size) {
+  val localSrcId = localSrcIds(i)
+  val srcId = local2global(localSrcId)
+  val localDstId = localDstIds(i)
+  val dstId = local2global(localDstId)
+  if (idPred(srcId, dstId)) {
+ctx.localSrcId = localSrcId
+ctx.localDstId = localDstId
+ctx.srcId = srcId
+ctx.dstId = dstId
+ctx.attr = data(i)
+if (tripletFields.useSrc) { ctx.srcAttr = vertexAttrs(localSrcId) }
+if (tripletFields.useDst) { ctx.dstAttr = vertexAttrs(localDstId) }
+sendMsg(ctx)
+  }
+  i += 1
+}
+
+bitset.iterator.map { localId => (local2global(localId), 
aggregates(localId)) }
   }
 
   /**
-   * Get an iterator over the edges in this partition whose source vertex 
ids match srcIdPred. The
-   * iterator is generated using an index scan, so it is efficient at 
skipping edges that don't
-   * match srcIdPred.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by
+   * filtering the source vertex index with `srcIdPred`, then scanning 
edge clusters and filtering
+   * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an 
edge to run.
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
-   */
-  def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
-index.iterator.filter(kv => 
srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
-
-  /**
-   * Get an iterator over the cluster of edges in this partition with 
source vertex id `srcId`. The
-   * cluster must start at position `index`.
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param srcIdPred a predicate to filter edges based on their source 
vertex id
+   * @param dstIdPred a predicate to filter edges based on their 
destination vertex id
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
-private[this] val edge = new Edge[ED]
-private[this] var pos = index
+  def aggregateMessagesWithIndex[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  srcIdPred: VertexId => Boolean,
+  dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset =

[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20126286
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -285,50 +337,126 @@ class EdgePartition[
   }
 
   /**
-   * Upgrade the given edge iterator into a triplet iterator.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
+   * all edges sequentially and filtering them with `idPred`.
+   *
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param sendMsgUsesSrcAttr whether or not `mapFunc` uses the edge's 
source vertex attribute
+   * @param sendMsgUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
+   * @param idPred a predicate to filter edges based on their source and 
destination vertex ids
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  def upgradeIterator(
-  edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
-: Iterator[EdgeTriplet[VD, ED]] = {
-new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  def aggregateMessages[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = new BitSet(vertexAttrs.length)
+
+var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, 
bitset)
+var i = 0
+while (i < size) {
+  val localSrcId = localSrcIds(i)
+  val srcId = local2global(localSrcId)
+  val localDstId = localDstIds(i)
+  val dstId = local2global(localDstId)
+  if (idPred(srcId, dstId)) {
+ctx.localSrcId = localSrcId
+ctx.localDstId = localDstId
+ctx.srcId = srcId
+ctx.dstId = dstId
+ctx.attr = data(i)
+if (tripletFields.useSrc) { ctx.srcAttr = vertexAttrs(localSrcId) }
+if (tripletFields.useDst) { ctx.dstAttr = vertexAttrs(localDstId) }
+sendMsg(ctx)
+  }
+  i += 1
+}
+
+bitset.iterator.map { localId => (local2global(localId), 
aggregates(localId)) }
   }
 
   /**
-   * Get an iterator over the edges in this partition whose source vertex 
ids match srcIdPred. The
-   * iterator is generated using an index scan, so it is efficient at 
skipping edges that don't
-   * match srcIdPred.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by
+   * filtering the source vertex index with `srcIdPred`, then scanning 
edge clusters and filtering
+   * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an 
edge to run.
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
-   */
-  def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
-index.iterator.filter(kv => 
srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
-
-  /**
-   * Get an iterator over the cluster of edges in this partition with 
source vertex id `srcId`. The
-   * cluster must start at position `index`.
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param srcIdPred a predicate to filter edges based on their source 
vertex id
+   * @param dstIdPred a predicate to filter edges based on their 
destination vertex id
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
-private[this] val edge = new Edge[ED]
-private[this] var pos = index
+  def aggregateMessagesWithIndex[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  srcIdPred: VertexId => Boolean,
+  dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = n

[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20126256
  
--- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -285,50 +337,126 @@ class EdgePartition[
   }
 
   /**
-   * Upgrade the given edge iterator into a triplet iterator.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
+   * all edges sequentially and filtering them with `idPred`.
+   *
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param sendMsgUsesSrcAttr whether or not `mapFunc` uses the edge's 
source vertex attribute
+   * @param sendMsgUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
+   * @param idPred a predicate to filter edges based on their source and 
destination vertex ids
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  def upgradeIterator(
-  edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
-: Iterator[EdgeTriplet[VD, ED]] = {
-new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  def aggregateMessages[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = new BitSet(vertexAttrs.length)
+
+var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, 
bitset)
+var i = 0
+while (i < size) {
+  val localSrcId = localSrcIds(i)
+  val srcId = local2global(localSrcId)
+  val localDstId = localDstIds(i)
+  val dstId = local2global(localDstId)
+  if (idPred(srcId, dstId)) {
+ctx.localSrcId = localSrcId
+ctx.localDstId = localDstId
+ctx.srcId = srcId
+ctx.dstId = dstId
+ctx.attr = data(i)
+if (tripletFields.useSrc) { ctx.srcAttr = vertexAttrs(localSrcId) }
+if (tripletFields.useDst) { ctx.dstAttr = vertexAttrs(localDstId) }
+sendMsg(ctx)
+  }
+  i += 1
+}
+
+bitset.iterator.map { localId => (local2global(localId), 
aggregates(localId)) }
   }
 
   /**
-   * Get an iterator over the edges in this partition whose source vertex 
ids match srcIdPred. The
-   * iterator is generated using an index scan, so it is efficient at 
skipping edges that don't
-   * match srcIdPred.
+   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by
+   * filtering the source vertex index with `srcIdPred`, then scanning 
edge clusters and filtering
+   * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an 
edge to run.
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
-   */
-  def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
-index.iterator.filter(kv => 
srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
-
-  /**
-   * Get an iterator over the cluster of edges in this partition with 
source vertex id `srcId`. The
-   * cluster must start at position `index`.
+   * @param sendMsg generates messages to neighboring vertices of an edge
+   * @param mergeMsg the combiner applied to messages destined to the same 
vertex
+   * @param srcIdPred a predicate to filter edges based on their source 
vertex id
+   * @param dstIdPred a predicate to filter edges based on their 
destination vertex id
*
-   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
-   * the same object is re-used in `next()`.
+   * @return iterator aggregated messages keyed by the receiving vertex id
*/
-  private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
-private[this] val edge = new Edge[ED]
-private[this] var pos = index
+  def aggregateMessagesWithIndex[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields,
+  srcIdPred: VertexId => Boolean,
+  dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
+val aggregates = new Array[A](vertexAttrs.length)
+val bitset = n

[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20126212
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+class TripletFields private (
--- End diff --

for the ease of the Java API (for the static object below), why don't we 
create this in Java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20126158
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---
@@ -326,8 +336,54 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] 
protected () extends Serializab
 : VertexRDD[A]
 
   /**
-   * Joins the vertices with entries in the `table` RDD and merges the 
results using `mapFunc`.  The
-   * input table should contain at most one entry for each vertex.  If no 
entry in `other` is
+   * Aggregates values from the neighboring edges and vertices of each 
vertex. The user-supplied
+   * `sendMsg` function is invoked on each edge of the graph, generating 0 
or more messages to be
+   * sent to either vertex in the edge. The `mergeMsg` function is then 
used to combine all messages
+   * destined to the same vertex.
+   *
+   * @tparam A the type of message to be sent to each vertex
+   *
+   * @param sendMsg runs on each edge, sending messages to neighboring 
vertices using the
+   *   [[EdgeContext]].
+   * @param mergeMsg used to combine messages from `sendMsg` destined to 
the same vertex. This
+   *   combiner should be commutative and associative.
+   * @param tripletFields which fields should be included in the 
[[EdgeContext]] passed to the
+   *   `sendMsg` function. If not all fields are needed, specifying this 
can improve performance.
+   * @param activeSetOpt an efficient way to run the aggregation on a 
subset of the edges if
+   *   desired. This is done by specifying a set of "active" vertices and 
an edge direction. The
+   *   `sendMsg` function will then run on only edges connected to active 
vertices by edges in the
+   *   specified direction. If the direction is `In`, `sendMsg` will only 
be run on edges with
+   *   destination in the active set. If the direction is `Out`, `sendMsg` 
will only be run on edges
+   *   originating from vertices in the active set. If the direction is 
`Either`, `sendMsg` will be
+   *   run on edges with *either* vertex in the active set. If the 
direction is `Both`, `sendMsg`
+   *   will be run on edges with *both* vertices in the active set. The 
active set must have the
+   *   same index as the graph's vertices.
+   *
+   * @example We can use this function to compute the in-degree of each
+   * vertex
+   * {{{
+   * val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
+   * val inDeg: RDD[(VertexId, Int)] =
+   *   aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
+   * }}}
+   *
+   * @note By expressing computation at the edge level we achieve
+   * maximum parallelism.  This is one of the core functions in the
+   * Graph API in that enables neighborhood level computation. For
+   * example this function can be used to count neighbors satisfying a
+   * predicate or implement PageRank.
+   *
+   */
+  def aggregateMessages[A: ClassTag](
+  sendMsg: EdgeContext[VD, ED, A] => Unit,
+  mergeMsg: (A, A) => A,
+  tripletFields: TripletFields = TripletFields.All,
+  activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
--- End diff --

it'd be better to create two versions of this, where the first version is 
public and doesn't have activeSetOpt. Then create an internal private version 
that have all four fields. basically it'd be better to not expose activeSetOpt 
since it is kinda complicated and scary to new users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20125939
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents an edge along with its neighboring vertices and allows 
sending messages along the
+ * edge. Used in [[Graph#aggregateMessages]].
+ */
+trait EdgeContext[VD, ED, A] {
+  /** The vertex id of the edge's source vertex. */
+  def srcId: VertexId
--- End diff --

it'd be better for long's directly instead of VertexId when we expose this. 
let's try to make this source compatible though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20125606
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---
@@ -207,8 +209,10 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] 
protected () extends Serializab
* }}}
*
*/
-  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): 
Graph[VD, ED2] = {
-mapTriplets((pid, iter) => iter.map(map))
+  def mapTriplets[ED2: ClassTag](
+  map: EdgeTriplet[VD, ED] => ED2,
+  tripletFields: TripletFields = TripletFields.All): Graph[VD, ED2] = {
--- End diff --

remove the default value, and create a version of this that takes only a map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20125517
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents an edge along with its neighboring vertices and allows 
sending messages along the
+ * edge. Used in [[Graph#aggregateMessages]].
+ */
+trait EdgeContext[VD, ED, A] {
--- End diff --

let's make this an abstract class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-09 Thread ankurdave
Github user ankurdave commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20062658
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+class TripletFields private (
+val useSrc: Boolean,
+val useDst: Boolean,
+val useEdge: Boolean)
--- End diff --

Yeah, we don't currently use it since it's cheap to access the edge 
attributes, but I think @jegonzal added it in case our internal representation 
changes and it becomes useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/3100#discussion_r20062181
  
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or 
[[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+class TripletFields private (
+val useSrc: Boolean,
+val useDst: Boolean,
+val useEdge: Boolean)
--- End diff --

maybe I'm just missing it, but it seems like `useEdge` is never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-61754331
  
  [Test build #22908 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22908/consoleFull)
 for   PR 3100 at commit 
[`1e80aca`](https://github.com/apache/spark/commit/1e80aca308463b0ec7dbeee58c7d1935ebb59e77).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `trait EdgeContext[VD, ED, A] `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-61754336
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22908/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-04 Thread ankurdave
Github user ankurdave commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-61748390
  
This PR is on top of #3054, so view the new changes here: 
I'm not sure about the name: aggregateNeighbors may be better.

@rxin @jegonzal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3100#issuecomment-61748262
  
  [Test build #22908 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22908/consoleFull)
 for   PR 3100 at commit 
[`1e80aca`](https://github.com/apache/spark/commit/1e80aca308463b0ec7dbeee58c7d1935ebb59e77).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...

2014-11-04 Thread ankurdave
GitHub user ankurdave opened a pull request:

https://github.com/apache/spark/pull/3100

[SPARK-3936] Add aggregateMessages, which supersedes mapReduceTriplets

aggregateMessages enables neighborhood computation similarly to 
mapReduceTriplets, but it introduces two API improvements:

1. Messages are sent using an imperative interface based on EdgeContext 
rather than by returning an iterator of messages. This is more efficient, 
providing a 20.2% speedup on PageRank over apache/spark#3054 (uk-2007-05 graph, 
10 iterations, 16 r3.2xlarge machines, sped up from 403 s to 322 s).

2. Rather than attempting bytecode inspection, the required triplet fields 
must be explicitly specified by the user by passing a TripletFields object. 
This fixes SPARK-3936.

Subsumes apache/spark#2815.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ankurdave/spark aggregateMessages

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3100


commit 4a566dc86624ac3f6dfa747d344c86e4be44adc2
Author: Ankur Dave 
Date:   2014-08-14T02:33:47Z

Optimizations for mapReduceTriplets and EdgePartition

1. EdgePartition now stores local vertex ids instead of global ids. This
   avoids hash lookups when looking up vertex attributes and aggregating
   messages.

2. Internal iterators in mapReduceTriplets are inlined into a while
   loop.

commit b567be2825ea22f2e61fbd9caa34940f5bc404df
Author: Ankur Dave 
Date:   2014-11-04T09:56:48Z

iter.foreach -> while loop

commit c85076de62b4c3344c443d4e85fce8fc47274aac
Author: Ankur Dave 
Date:   2014-11-04T09:58:00Z

Readability improvements

commit e0f8ecc7b678de2b011650ed96b974369730947e
Author: Ankur Dave 
Date:   2014-11-04T09:58:23Z

Take activeSet in ExistingEdgePartitionBuilder

Also rename VertexPreservingEdgePartitionBuilder to
ExistingEdgePartitionBuilder to better reflect its usage.

commit 194a2df94768be9c08ed50654170bad937bd115a
Author: Ankur Dave 
Date:   2014-11-04T10:03:34Z

Test triplet iterator in EdgePartition serialization test

commit 1e80aca308463b0ec7dbeee58c7d1935ebb59e77
Author: Ankur Dave 
Date:   2014-11-01T07:01:21Z

Add aggregateMessages, which supersedes mapReduceTriplets

aggregateMessages enables neighborhood computation similarly to
mapReduceTriplets, but it introduces two API improvements:

1. Messages are sent using an imperative interface based on EdgeContext
rather than by returning an iterator of messages. This is more
efficient, providing a 20.2% speedup on PageRank over
apache/spark#3054 (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge
machines, sped up from 403 s to 322 s).

2. Rather than attempting bytecode inspection, the required triplet
fields must be explicitly specified by the user by passing a
TripletFields object. This fixes SPARK-3936.

Subsumes apache/spark#2815.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org