That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <> wrote:
> But RDD.aggregate() has this code:
>     // Clone the zero value since we will also be serializing it as part of
> tasks
>     var jobResult = Utils.clone(zeroValue,
> sc.env.closureSerializer.newInstance())
> I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
> we just missed it and need to apply the change to aggregate()? It seems
> appropriate to target a fix for 1.3.0.
> -Matt Cheah
> From: Josh Rosen <>
> Date: Wednesday, February 18, 2015 at 6:12 AM
> To: Matt Cheah <>
> Cc: "" <>, Mingyu Kim
> <>, Andrew Ash <>
> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
> reasoning?
> It looks like this was fixed in
> /
>  Can you see whether that patch
> fixes this issue for you?
> On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <> wrote:
>> Hi everyone,
>> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
>> before, since I assumed that every aggregation required a key. However, I
>> realized I could do my analysis using JavaRDD’s aggregate() instead and not
>> use a key.
>> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
>> combineByKey requires that a “createCombiner” function is provided, and the
>> return value from that function must be serializable using Kryo. When I
>> switched to using rdd.aggregate I assumed that the zero value would also be
>> strictly Kryo serialized, as it is a data item and not part of a closure or
>> the aggregation functions. However, I got a serialization exception as the
>> closure serializer (only valid serializer is the Java serializer) was used
>> instead.
>> I was wondering the following:
>> What is the rationale for making the zero value be serialized using the
>> closure serializer? This isn’t part of the closure, but is an initial data
>> item.
>> Would it make sense for us to perhaps write a version of rdd.aggregate()
>> that takes a function as a parameter, that generates the zero value? This
>> would be more intuitive to be serialized using the closure serializer.
>> I believe aggregateByKey is also affected.
>> Thanks,
>> -Matt Cheah

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to