But RDD.aggregate() has this code:

    // Clone the zero value since we will also be serializing it as part of
tasks
    var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen <rosenvi...@gmail.com>
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah <mch...@palantir.com>
Cc:  "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim
<m...@palantir.com>, Andrew Ash <a...@palantir.com>
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
_browse_SPARK-2D4743&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&
r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3
WSrjQQZX1rW_e9w&s=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekc&e=>  /
https://github.com/apache/spark/pull/3605
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar
k_pull_3605&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ
9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1
rW_e9w&s=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8&e=> .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote:
> Hi everyone,
> 
> I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD¹s aggregate() instead and not
> use a key.
> 
> I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
> requires that a ³createCombiner² function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the aggregation
> functions. However, I got a serialization exception as the closure serializer
> (only valid serializer is the Java serializer) was used instead.
> 
> I was wondering the following:
> 1. What is the rationale for making the zero value be serialized using the
> closure serializer? This isn¹t part of the closure, but is an initial data
> item.
> 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
> I believe aggregateByKey is also affected.
> 
> Thanks,
> 
> -Matt Cheah



Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to