[ https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286386#comment-14286386 ]
Apache Spark commented on SPARK-5360: ------------------------------------- User 'kayousterhout' has created a pull request for this issue: https://github.com/apache/spark/pull/4145 > For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are > included twice in serialized task > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-5360 > URL: https://issues.apache.org/jira/browse/SPARK-5360 > Project: Spark > Issue Type: Bug > Affects Versions: 1.2.0 > Reporter: Kay Ousterhout > Assignee: Kay Ousterhout > Priority: Minor > > CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that > the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. > The partition is serialized separately from the RDD, so when the RDD and > partition arrive on the worker, the references in the partition and in the > RDD no longer point to the same object. > This is a relatively minor performance issue (the closure can be 2x larger > than it needs to be because the rdds and partitions are serialized twice; see > numbers below) but is more annoying as a developer issue (this is where I ran > into): if any state is stored in the RDD or ShuffleHandle on the worker side, > subtle bugs can appear due to the fact that the references to the RDD / > ShuffleHandle in the RDD and in the partition point to separate objects. I'm > not sure if this is enough of a potential future problem to fix this old and > central part of the code, so hoping to get input from others here. > I did some simple experiments to see how much this effects closure size. For > this example: > $ val a = sc.parallelize(1 to 10).map((_, 1)) > $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x)) > $ a.cogroup(b).collect() > the closure was 1902 bytes with current Spark, and 1129 bytes after my > change. The difference comes from eliminating duplicate serialization of the > shuffle handle. > For this example: > $ val sortedA = a.sortByKey() > $ val sortedB = b.sortByKey() > $ sortedA.cogroup(sortedB).collect() > the closure was 3491 bytes with current Spark, and 1333 bytes after my > change. Here, the difference comes from eliminating duplicate serialization > of the two RDDs for the narrow dependencies. > The ShuffleHandle includes the ShuffleDependency, so this difference will get > larger if a ShuffleDependency includes a serializer, a key ordering, or an > aggregator (all set to None by default). However, the difference is not > affected by the size of the function the user specifies, which (based on my > understanding) is typically the source of large task closures. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org