[ 
https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-5360:
-----------------------------
    Component/s: Spark Core

> For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are 
> included twice in serialized task
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5360
>                 URL: https://issues.apache.org/jira/browse/SPARK-5360
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Kay Ousterhout
>            Assignee: Kay Ousterhout
>            Priority: Minor
>
> CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that 
> the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.  
> The partition is serialized separately from the RDD, so when the RDD and 
> partition arrive on the worker, the references in the partition and in the 
> RDD no longer point to the same object.
> This is a relatively minor performance issue (the closure can be 2x larger 
> than it needs to be because the rdds and partitions are serialized twice; see 
> numbers below) but is more annoying as a developer issue (this is where I ran 
> into): if any state is stored in the RDD or ShuffleHandle on the worker side, 
> subtle bugs can appear due to the fact that the references to the RDD / 
> ShuffleHandle in the RDD and in the partition point to separate objects.  I'm 
> not sure if this is enough of a potential future problem to fix this old and 
> central part of the code, so hoping to get input from others here.
> I did some simple experiments to see how much this effects closure size.  For 
> this example: 
> $ val a = sc.parallelize(1 to 10).map((_, 1))
> $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
> $ a.cogroup(b).collect()
> the closure was 1902 bytes with current Spark, and 1129 bytes after my 
> change.  The difference comes from eliminating duplicate serialization of the 
> shuffle handle.
> For this example:
> $ val sortedA = a.sortByKey()
> $ val sortedB = b.sortByKey()
> $ sortedA.cogroup(sortedB).collect()
> the closure was 3491 bytes with current Spark, and 1333 bytes after my 
> change. Here, the difference comes from eliminating duplicate serialization 
> of the two RDDs for the narrow dependencies.
> The ShuffleHandle includes the ShuffleDependency, so this difference will get 
> larger if a ShuffleDependency includes a serializer, a key ordering, or an 
> aggregator (all set to None by default).  However, the difference is not 
> affected by the size of the function the user specifies, which (based on my 
> understanding) is typically the source of large task closures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to