But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen <rosenvi...@gmail.com> Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah <mch...@palantir.com> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim <m...@palantir.com>, Andrew Ash <a...@palantir.com> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira _browse_SPARK-2D4743&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8& r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3 WSrjQQZX1rW_e9w&s=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekc&e=> / https://github.com/apache/spark/pull/3605 <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar k_pull_3605&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ 9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1 rW_e9w&s=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8&e=> . Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations > before, since I assumed that every aggregation required a key. However, I > realized I could do my analysis using JavaRDD¹s aggregate() instead and not > use a key. > > I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey > requires that a ³createCombiner² function is provided, and the return value > from that function must be serializable using Kryo. When I switched to using > rdd.aggregate I assumed that the zero value would also be strictly Kryo > serialized, as it is a data item and not part of a closure or the aggregation > functions. However, I got a serialization exception as the closure serializer > (only valid serializer is the Java serializer) was used instead. > > I was wondering the following: > 1. What is the rationale for making the zero value be serialized using the > closure serializer? This isn¹t part of the closure, but is an initial data > item. > 2. Would it make sense for us to perhaps write a version of rdd.aggregate() > that takes a function as a parameter, that generates the zero value? This > would be more intuitive to be serialized using the closure serializer. > I believe aggregateByKey is also affected. > > Thanks, > > -Matt Cheah
smime.p7s
Description: S/MIME cryptographic signature