That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mch...@palantir.com> wrote:
> But RDD.aggregate() has this code:
>
>     // Clone the zero value since we will also be serializing it as part of
> tasks
>     var jobResult = Utils.clone(zeroValue,
> sc.env.closureSerializer.newInstance())
>
> I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
> we just missed it and need to apply the change to aggregate()? It seems
> appropriate to target a fix for 1.3.0.
>
> -Matt Cheah
> From: Josh Rosen <rosenvi...@gmail.com>
> Date: Wednesday, February 18, 2015 at 6:12 AM
> To: Matt Cheah <mch...@palantir.com>
> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim
> <m...@palantir.com>, Andrew Ash <a...@palantir.com>
> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
> reasoning?
>
> It looks like this was fixed in
> https://issues.apache.org/jira/browse/SPARK-4743 /
> https://github.com/apache/spark/pull/3605.  Can you see whether that patch
> fixes this issue for you?
>
>
>
> On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote:
>>
>> Hi everyone,
>>
>> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
>> before, since I assumed that every aggregation required a key. However, I
>> realized I could do my analysis using JavaRDD’s aggregate() instead and not
>> use a key.
>>
>> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
>> combineByKey requires that a “createCombiner” function is provided, and the
>> return value from that function must be serializable using Kryo. When I
>> switched to using rdd.aggregate I assumed that the zero value would also be
>> strictly Kryo serialized, as it is a data item and not part of a closure or
>> the aggregation functions. However, I got a serialization exception as the
>> closure serializer (only valid serializer is the Java serializer) was used
>> instead.
>>
>> I was wondering the following:
>>
>> What is the rationale for making the zero value be serialized using the
>> closure serializer? This isn’t part of the closure, but is an initial data
>> item.
>> Would it make sense for us to perhaps write a version of rdd.aggregate()
>> that takes a function as a parameter, that generates the zero value? This
>> would be more intuitive to be serialized using the closure serializer.
>>
>> I believe aggregateByKey is also affected.
>>
>> Thanks,
>>
>> -Matt Cheah
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to