Repository: spark
Updated Branches:
  refs/heads/master 5fea3e5c3 -> c035c0f2d


[SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD

CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that 
the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. The 
partition is serialized separately from the RDD, so when the RDD and partition 
arrive on the worker, the references in the partition and in the RDD no longer 
point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than 
it needs to be because the rdds and partitions are serialized twice; see 
numbers below) but is more annoying as a developer issue (this is where I ran 
into): if any state is stored in the RDD or ShuffleHandle on the worker side, 
subtle bugs can appear due to the fact that the references to the RDD / 
ShuffleHandle in the RDD and in the partition point to separate objects. I'm 
not sure if this is enough of a potential future problem to fix this old and 
central part of the code, so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size. For 
this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change. 
The difference comes from eliminating duplicate serialization of the shuffle 
handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change. 
Here, the difference comes from eliminating duplicate serialization of the two 
RDDs for the narrow dependencies.

The ShuffleHandle includes the ShuffleDependency, so this difference will get 
larger if a ShuffleDependency includes a serializer, a key ordering, or an 
aggregator (all set to None by default). It would also get bigger for a big RDD 
-- although I can't think of any examples where the RDD object gets large.  The 
difference is not affected by the size of the function the user specifies, 
which (based on my understanding) is typically the source of large task 
closures.

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #4145 from kayousterhout/SPARK-5360 and squashes the following commits:

85156c3 [Kay Ousterhout] Better comment the narrowDeps parameter
cff0209 [Kay Ousterhout] Fixed spelling issue
658e1af [Kay Ousterhout] [SPARK-5360] Eliminate duplicate objects in serialized 
CoGroupedRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c035c0f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c035c0f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c035c0f2

Branch: refs/heads/master
Commit: c035c0f2d72f2a303b86fe0037ec43d756fff060
Parents: 5fea3e5
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Tue Apr 21 11:01:18 2015 -0700
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Tue Apr 21 11:01:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 43 ++++++++++++--------
 .../org/apache/spark/rdd/SubtractedRDD.scala    | 30 ++++++++------
 2 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c035c0f2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7021a33..658e8c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -29,15 +29,16 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, 
CompactBuffer}
 import org.apache.spark.util.Utils
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.ShuffleHandle
-
-private[spark] sealed trait CoGroupSplitDep extends Serializable
 
+/** The references to rdd and splitIndex are transient because redundant 
information is stored
+  * in the CoGroupedRDD object.  Because CoGroupedRDD is serialized separately 
from
+  * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be 
included twice in the
+  * task closure. */
 private[spark] case class NarrowCoGroupSplitDep(
-    rdd: RDD[_],
-    splitIndex: Int,
+    @transient rdd: RDD[_],
+    @transient splitIndex: Int,
     var split: Partition
-  ) extends CoGroupSplitDep {
+  ) extends Serializable {
 
   @throws(classOf[IOException])
   private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
@@ -47,9 +48,16 @@ private[spark] case class NarrowCoGroupSplitDep(
   }
 }
 
-private[spark] case class ShuffleCoGroupSplitDep(handle: ShuffleHandle) 
extends CoGroupSplitDep
-
-private[spark] class CoGroupPartition(idx: Int, val deps: 
Array[CoGroupSplitDep])
+/**
+ * Stores information about the narrow dependencies used by a CoGroupedRdd.
+ *
+ * @param narrowDeps maps to the dependencies variable in the parent RDD: for 
each one to one
+ *                   dependency in dependencies, narrowDeps has a 
NarrowCoGroupSplitDep (describing
+ *                   the partition for that dependency) at the corresponding 
index. The size of
+ *                   narrowDeps should always be equal to the number of 
parents.
+ */
+private[spark] class CoGroupPartition(
+    idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])
   extends Partition with Serializable {
   override val index: Int = idx
   override def hashCode(): Int = idx
@@ -105,9 +113,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
         // Assume each RDD contributed a single dependency, and get it
         dependencies(j) match {
           case s: ShuffleDependency[_, _, _] =>
-            new ShuffleCoGroupSplitDep(s.shuffleHandle)
+            None
           case _ =>
-            new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
         }
       }.toArray)
     }
@@ -120,20 +128,21 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
     val sparkConf = SparkEnv.get.conf
     val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
-    val numRdds = split.deps.length
+    val numRdds = dependencies.length
 
     // A list of (rdd iterator, dependency number) pairs
     val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
-    for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
-      case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
+    for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
+      case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
+        val dependencyPartition = split.narrowDeps(depNum).get.split
         // Read them from the parent
-        val it = rdd.iterator(itsSplit, 
context).asInstanceOf[Iterator[Product2[K, Any]]]
+        val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
         rddIterators += ((it, depNum))
 
-      case ShuffleCoGroupSplitDep(handle) =>
+      case shuffleDependency: ShuffleDependency[_, _, _] =>
         // Read map outputs of shuffle
         val it = SparkEnv.get.shuffleManager
-          .getReader(handle, split.index, split.index + 1, context)
+          .getReader(shuffleDependency.shuffleHandle, split.index, split.index 
+ 1, context)
           .read()
         rddIterators += ((it, depNum))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c035c0f2/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index e9d7455..633aeba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -81,9 +81,9 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, 
W: ClassTag](
       array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { 
case (rdd, j) =>
         dependencies(j) match {
           case s: ShuffleDependency[_, _, _] =>
-            new ShuffleCoGroupSplitDep(s.shuffleHandle)
+            None
           case _ =>
-            new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
         }
       }.toArray)
     }
@@ -105,20 +105,26 @@ private[spark] class SubtractedRDD[K: ClassTag, V: 
ClassTag, W: ClassTag](
         seq
       }
     }
-    def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit): Unit = 
dep match {
-      case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
-        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, 
V]]].foreach(op)
+    def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
+      dependencies(depNum) match {
+        case oneToOneDependency: OneToOneDependency[_] =>
+          val dependencyPartition = partition.narrowDeps(depNum).get.split
+          oneToOneDependency.rdd.iterator(dependencyPartition, context)
+            .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
 
-      case ShuffleCoGroupSplitDep(handle) =>
-        val iter = SparkEnv.get.shuffleManager
-          .getReader(handle, partition.index, partition.index + 1, context)
-          .read()
-        iter.foreach(op)
+        case shuffleDependency: ShuffleDependency[_, _, _] =>
+          val iter = SparkEnv.get.shuffleManager
+            .getReader(
+              shuffleDependency.shuffleHandle, partition.index, 
partition.index + 1, context)
+            .read()
+          iter.foreach(op)
+      }
     }
+
     // the first dep is rdd1; add all values to the map
-    integrate(partition.deps(0), t => getSeq(t._1) += t._2)
+    integrate(0, t => getSeq(t._1) += t._2)
     // the second dep is rdd2; remove all of its keys
-    integrate(partition.deps(1), t => map.remove(t._1))
+    integrate(1, t => map.remove(t._1))
     map.iterator.map { t =>  t._2.iterator.map { (t._1, _) } }.flatten
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to