Repository: spark
Updated Branches:
  refs/heads/branch-0.9 aef6390cd -> a92900c5a


SPARK-1188: Do not re-use objects in the EdgePartition/EdgeTriplet iterators.

This avoids a silent data corruption issue 
(https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance 
impact by my measurements. It also simplifies the code. As far as I can tell 
the object re-use was nothing but premature optimization.

I did actual benchmarks for all the included changes, and there is no 
performance difference. I am not sure where to put the benchmarks. Does Spark 
not have a benchmark suite?

This is an example benchmark I did:

test("benchmark") {
  val builder = new EdgePartitionBuilder[Int]
  for (i <- (1 to 10000000)) {
    builder.add(i.toLong, i.toLong, i)
  }
  val p = builder.toEdgePartition
  p.map(_.attr + 1).iterator.toList
}

It ran for 10 seconds both before and after this change.

Author: Daniel Darabos <darabos.dan...@gmail.com>

Closes #276 from darabos/spark-1188 and squashes the following commits:

574302b [Daniel Darabos] Restore "manual" copying in 
EdgePartition.map(Iterator). Add comment to discourage novices like myself from 
trying to simplify the code.
4117a64 [Daniel Darabos] Revert EdgePartitionSuite.
4955697 [Daniel Darabos] Create a copy of the Edge objects in 
EdgeRDD.compute(). This avoids exposing the object re-use, while still enables 
the more efficient behavior for internal code.
4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected 
functions.
2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition.
0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet 
iterators. This avoids a silent data corruption issue (SPARK-1188) and has no 
performance impact in my measurements. It also simplifies the code.
c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188.

(cherry picked from commit 78236334e4ca7518b6d7d9b38464dbbda854a777)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a92900c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a92900c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a92900c5

Branch: refs/heads/branch-0.9
Commit: a92900c5aca96d0609682ae9d9893c7d9e0f6327
Parents: aef6390
Author: Daniel Darabos <darabos.dan...@gmail.com>
Authored: Wed Apr 2 12:27:37 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Thu May 29 00:42:55 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  3 +-
 .../spark/graphx/impl/EdgePartition.scala       | 15 +++++--
 .../spark/graphx/impl/EdgeTripletIterator.scala |  7 +---
 .../graphx/impl/EdgeTripletIteratorSuite.scala  | 43 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fe03ae4..5f91f96 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
     
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
-    firstParent[(PartitionID, EdgePartition[ED])].iterator(part, 
context).next._2.iterator
+    val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, 
context)
+    p.next._2.iterator.map(_.copy())
   }
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 57fa5ee..2e05f5d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, 
Long, Float, Double)
    * Construct a new edge partition by applying the function f to all
    * edges in this partition.
    *
+   * Be careful not to keep references to the objects passed to `f`.
+   * To improve GC performance the same object is re-used for each call.
+   *
    * @param f a function from an edge to a new attribute
    * @tparam ED2 the type of the new attribute
    * @return a new edge partition with the result of the function `f`
@@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, 
Long, Float, Double)
    * order of the edges returned by `EdgePartition.iterator` and
    * should return attributes equal to the number of edges.
    *
-   * @param f a function from an edge to a new attribute
+   * @param iter an iterator for the new attribute values
    * @tparam ED2 the type of the new attribute
-   * @return a new edge partition with the result of the function `f`
-   *         applied to each edge
+   * @return a new edge partition with the attribute values replaced
    */
   def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+    // Faster than iter.toArray, because the expected size is known.
     val newData = new Array[ED2](data.size)
     var i = 0
     while (iter.hasNext) {
@@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, 
Long, Float, Double)
   /**
    * Get an iterator over the edges in this partition.
    *
+   * Be careful not to keep references to the objects from this iterator.
+   * To improve GC performance the same object is re-used in `next()`.
+   *
    * @return an iterator over edges in the partition
    */
   def iterator = new Iterator[Edge[ED]] {
@@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, 
Long, Float, Double)
   /**
    * Get an iterator over the cluster of edges in this partition with source 
vertex id `srcId`. The
    * cluster must start at position `index`.
+   *
+   * Be careful not to keep references to the objects from this iterator. To 
improve GC performance
+   * the same object is re-used in `next()`.
    */
   private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
     private[this] val edge = new Edge[ED]

http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 886c250..220a89d 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
   // Current position in the array.
   private var pos = 0
 
-  // A triplet object that this iterator.next() call returns. We reuse this 
object to avoid
-  // allocating too many temporary Java objects.
-  private val triplet = new EdgeTriplet[VD, ED]
-
   private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, 
vertexArray)
 
   override def hasNext: Boolean = pos < edgePartition.size
 
   override def next() = {
+    val triplet = new EdgeTriplet[VD, ED]
     triplet.srcId = edgePartition.srcIds(pos)
-    // assert(vmap.containsKey(e.src.id))
     triplet.srcAttr = vmap(triplet.srcId)
     triplet.dstId = edgePartition.dstIds(pos)
-    // assert(vmap.containsKey(e.dst.id))
     triplet.dstAttr = vmap(triplet.dstId)
     triplet.attr = edgePartition.data(pos)
     pos += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
 
b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
new file mode 100644
index 0000000..9cbb2d2
--- /dev/null
+++ 
b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+
+class EdgeTripletIteratorSuite extends FunSuite {
+  test("iterator.toList") {
+    val builder = new EdgePartitionBuilder[Int]
+    builder.add(1, 2, 0)
+    builder.add(1, 3, 0)
+    builder.add(1, 4, 0)
+    val vidmap = new VertexIdToIndexMap
+    vidmap.add(1)
+    vidmap.add(2)
+    vidmap.add(3)
+    vidmap.add(4)
+    val vs = Array.fill(vidmap.capacity)(0)
+    val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, 
builder.toEdgePartition)
+    val result = iter.toList.map(et => (et.srcId, et.dstId))
+    assert(result === Seq((1, 2), (1, 3), (1, 4)))
+  }
+}

Reply via email to