Yes, that's a bug and should be using the standard serializer. On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen <so...@cloudera.com> wrote:
> That looks, at the least, inconsistent. As far as I know this should > be changed so that the zero value is always cloned via the non-closure > serializer. Any objection to that? > > On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mch...@palantir.com> wrote: > > But RDD.aggregate() has this code: > > > > // Clone the zero value since we will also be serializing it as part > of > > tasks > > var jobResult = Utils.clone(zeroValue, > > sc.env.closureSerializer.newInstance()) > > > > I do see the SparkEnv.get.serializer used in aggregateByKey however. > Perhaps > > we just missed it and need to apply the change to aggregate()? It seems > > appropriate to target a fix for 1.3.0. > > > > -Matt Cheah > > From: Josh Rosen <rosenvi...@gmail.com> > > Date: Wednesday, February 18, 2015 at 6:12 AM > > To: Matt Cheah <mch...@palantir.com> > > Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim > > <m...@palantir.com>, Andrew Ash <a...@palantir.com> > > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero > value > > reasoning? > > > > It looks like this was fixed in > > https://issues.apache.org/jira/browse/SPARK-4743 / > > https://github.com/apache/spark/pull/3605. Can you see whether that > patch > > fixes this issue for you? > > > > > > > > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote: > >> > >> Hi everyone, > >> > >> I was using JavaPairRDD’s combineByKey() to compute all of my > aggregations > >> before, since I assumed that every aggregation required a key. However, > I > >> realized I could do my analysis using JavaRDD’s aggregate() instead and > not > >> use a key. > >> > >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s > >> combineByKey requires that a “createCombiner” function is provided, and > the > >> return value from that function must be serializable using Kryo. When I > >> switched to using rdd.aggregate I assumed that the zero value would > also be > >> strictly Kryo serialized, as it is a data item and not part of a > closure or > >> the aggregation functions. However, I got a serialization exception as > the > >> closure serializer (only valid serializer is the Java serializer) was > used > >> instead. > >> > >> I was wondering the following: > >> > >> What is the rationale for making the zero value be serialized using the > >> closure serializer? This isn’t part of the closure, but is an initial > data > >> item. > >> Would it make sense for us to perhaps write a version of rdd.aggregate() > >> that takes a function as a parameter, that generates the zero value? > This > >> would be more intuitive to be serialized using the closure serializer. > >> > >> I believe aggregateByKey is also affected. > >> > >> Thanks, > >> > >> -Matt Cheah > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >