Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen <so...@cloudera.com> wrote:

> That looks, at the least, inconsistent. As far as I know this should
> be changed so that the zero value is always cloned via the non-closure
> serializer. Any objection to that?
>
> On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mch...@palantir.com> wrote:
> > But RDD.aggregate() has this code:
> >
> >     // Clone the zero value since we will also be serializing it as part
> of
> > tasks
> >     var jobResult = Utils.clone(zeroValue,
> > sc.env.closureSerializer.newInstance())
> >
> > I do see the SparkEnv.get.serializer used in aggregateByKey however.
> Perhaps
> > we just missed it and need to apply the change to aggregate()? It seems
> > appropriate to target a fix for 1.3.0.
> >
> > -Matt Cheah
> > From: Josh Rosen <rosenvi...@gmail.com>
> > Date: Wednesday, February 18, 2015 at 6:12 AM
> > To: Matt Cheah <mch...@palantir.com>
> > Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim
> > <m...@palantir.com>, Andrew Ash <a...@palantir.com>
> > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
> value
> > reasoning?
> >
> > It looks like this was fixed in
> > https://issues.apache.org/jira/browse/SPARK-4743 /
> > https://github.com/apache/spark/pull/3605.  Can you see whether that
> patch
> > fixes this issue for you?
> >
> >
> >
> > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mch...@palantir.com> wrote:
> >>
> >> Hi everyone,
> >>
> >> I was using JavaPairRDD’s combineByKey() to compute all of my
> aggregations
> >> before, since I assumed that every aggregation required a key. However,
> I
> >> realized I could do my analysis using JavaRDD’s aggregate() instead and
> not
> >> use a key.
> >>
> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> >> combineByKey requires that a “createCombiner” function is provided, and
> the
> >> return value from that function must be serializable using Kryo. When I
> >> switched to using rdd.aggregate I assumed that the zero value would
> also be
> >> strictly Kryo serialized, as it is a data item and not part of a
> closure or
> >> the aggregation functions. However, I got a serialization exception as
> the
> >> closure serializer (only valid serializer is the Java serializer) was
> used
> >> instead.
> >>
> >> I was wondering the following:
> >>
> >> What is the rationale for making the zero value be serialized using the
> >> closure serializer? This isn’t part of the closure, but is an initial
> data
> >> item.
> >> Would it make sense for us to perhaps write a version of rdd.aggregate()
> >> that takes a function as a parameter, that generates the zero value?
> This
> >> would be more intuitive to be serialized using the closure serializer.
> >>
> >> I believe aggregateByKey is also affected.
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to