That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that?
On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mch...@palantir.com> wrote: > But RDD.aggregate() has this code: > > // Clone the zero value since we will also be serializing it as part of > tasks > var jobResult = Utils.clone(zeroValue, > sc.env.closureSerializer.newInstance()) > > I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps > we just missed it and need to apply the change to aggregate()? It seems > appropriate to target a fix for 1.3.0. > > -Matt Cheah > From: Josh Rosen <rosenvi...@gmail.com> > Date: Wednesday, February 18, 2015 at 6:12 AM > To: Matt Cheah <mch...@palantir.com> > Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim > <m...@palantir.com>, Andrew Ash <a...@palantir.com> > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value > reasoning? > > It looks like this was fixed in > https://issues.apache.org/jira/browse/SPARK-4743 / > https://github.com/apache/spark/pull/3605. Can you see whether that patch > fixes this issue for you? > > > > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote: >> >> Hi everyone, >> >> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations >> before, since I assumed that every aggregation required a key. However, I >> realized I could do my analysis using JavaRDD’s aggregate() instead and not >> use a key. >> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s >> combineByKey requires that a “createCombiner” function is provided, and the >> return value from that function must be serializable using Kryo. When I >> switched to using rdd.aggregate I assumed that the zero value would also be >> strictly Kryo serialized, as it is a data item and not part of a closure or >> the aggregation functions. However, I got a serialization exception as the >> closure serializer (only valid serializer is the Java serializer) was used >> instead. >> >> I was wondering the following: >> >> What is the rationale for making the zero value be serialized using the >> closure serializer? This isn’t part of the closure, but is an initial data >> item. >> Would it make sense for us to perhaps write a version of rdd.aggregate() >> that takes a function as a parameter, that generates the zero value? This >> would be more intuitive to be serialized using the closure serializer. >> >> I believe aggregateByKey is also affected. >> >> Thanks, >> >> -Matt Cheah > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org