Repository: spark Updated Branches: refs/heads/branch-0.9 aef6390cd -> a92900c5a
SPARK-1188: Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization. I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite? This is an example benchmark I did: test("benchmark") { val builder = new EdgePartitionBuilder[Int] for (i <- (1 to 10000000)) { builder.add(i.toLong, i.toLong, i) } val p = builder.toEdgePartition p.map(_.attr + 1).iterator.toList } It ran for 10 seconds both before and after this change. Author: Daniel Darabos <darabos.dan...@gmail.com> Closes #276 from darabos/spark-1188 and squashes the following commits: 574302b [Daniel Darabos] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. 4117a64 [Daniel Darabos] Revert EdgePartitionSuite. 4955697 [Daniel Darabos] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. 4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected functions. 2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition. 0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188. (cherry picked from commit 78236334e4ca7518b6d7d9b38464dbbda854a777) Signed-off-by: Reynold Xin <r...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a92900c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a92900c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a92900c5 Branch: refs/heads/branch-0.9 Commit: a92900c5aca96d0609682ae9d9893c7d9e0f6327 Parents: aef6390 Author: Daniel Darabos <darabos.dan...@gmail.com> Authored: Wed Apr 2 12:27:37 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu May 29 00:42:55 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 3 +- .../spark/graphx/impl/EdgePartition.scala | 15 +++++-- .../spark/graphx/impl/EdgeTripletIterator.scala | 7 +--- .../graphx/impl/EdgeTripletIteratorSuite.scala | 43 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index fe03ae4..5f91f96 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag]( partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator + val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context) + p.next._2.iterator.map(_.copy()) } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 57fa5ee..2e05f5d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * Construct a new edge partition by applying the function f to all * edges in this partition. * + * Be careful not to keep references to the objects passed to `f`. + * To improve GC performance the same object is re-used for each call. + * * @param f a function from an edge to a new attribute * @tparam ED2 the type of the new attribute * @return a new edge partition with the result of the function `f` @@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * order of the edges returned by `EdgePartition.iterator` and * should return attributes equal to the number of edges. * - * @param f a function from an edge to a new attribute + * @param iter an iterator for the new attribute values * @tparam ED2 the type of the new attribute - * @return a new edge partition with the result of the function `f` - * applied to each edge + * @return a new edge partition with the attribute values replaced */ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { + // Faster than iter.toArray, because the expected size is known. val newData = new Array[ED2](data.size) var i = 0 while (iter.hasNext) { @@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) /** * Get an iterator over the edges in this partition. * + * Be careful not to keep references to the objects from this iterator. + * To improve GC performance the same object is re-used in `next()`. + * * @return an iterator over edges in the partition */ def iterator = new Iterator[Edge[ED]] { @@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) /** * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * cluster must start at position `index`. + * + * Be careful not to keep references to the objects from this iterator. To improve GC performance + * the same object is re-used in `next()`. */ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { private[this] val edge = new Edge[ED] http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 886c250..220a89d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( // Current position in the array. private var pos = 0 - // A triplet object that this iterator.next() call returns. We reuse this object to avoid - // allocating too many temporary Java objects. - private val triplet = new EdgeTriplet[VD, ED] - private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size override def next() = { + val triplet = new EdgeTriplet[VD, ED] triplet.srcId = edgePartition.srcIds(pos) - // assert(vmap.containsKey(e.src.id)) triplet.srcAttr = vmap(triplet.srcId) triplet.dstId = edgePartition.dstIds(pos) - // assert(vmap.containsKey(e.dst.id)) triplet.dstAttr = vmap(triplet.dstId) triplet.attr = edgePartition.data(pos) pos += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/a92900c5/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala new file mode 100644 index 0000000..9cbb2d2 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ + +class EdgeTripletIteratorSuite extends FunSuite { + test("iterator.toList") { + val builder = new EdgePartitionBuilder[Int] + builder.add(1, 2, 0) + builder.add(1, 3, 0) + builder.add(1, 4, 0) + val vidmap = new VertexIdToIndexMap + vidmap.add(1) + vidmap.add(2) + vidmap.add(3) + vidmap.add(4) + val vs = Array.fill(vidmap.capacity)(0) + val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition) + val result = iter.toList.map(et => (et.srcId, et.dstId)) + assert(result === Seq((1, 2), (1, 3), (1, 4))) + } +}