Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Reynold Xin
Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen  wrote:

> That looks, at the least, inconsistent. As far as I know this should
> be changed so that the zero value is always cloned via the non-closure
> serializer. Any objection to that?
>
> On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah  wrote:
> > But RDD.aggregate() has this code:
> >
> > // Clone the zero value since we will also be serializing it as part
> of
> > tasks
> > var jobResult = Utils.clone(zeroValue,
> > sc.env.closureSerializer.newInstance())
> >
> > I do see the SparkEnv.get.serializer used in aggregateByKey however.
> Perhaps
> > we just missed it and need to apply the change to aggregate()? It seems
> > appropriate to target a fix for 1.3.0.
> >
> > -Matt Cheah
> > From: Josh Rosen 
> > Date: Wednesday, February 18, 2015 at 6:12 AM
> > To: Matt Cheah 
> > Cc: "dev@spark.apache.org" , Mingyu Kim
> > , Andrew Ash 
> > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
> value
> > reasoning?
> >
> > It looks like this was fixed in
> > https://issues.apache.org/jira/browse/SPARK-4743 /
> > https://github.com/apache/spark/pull/3605.  Can you see whether that
> patch
> > fixes this issue for you?
> >
> >
> >
> > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
> >>
> >> Hi everyone,
> >>
> >> I was using JavaPairRDD’s combineByKey() to compute all of my
> aggregations
> >> before, since I assumed that every aggregation required a key. However,
> I
> >> realized I could do my analysis using JavaRDD’s aggregate() instead and
> not
> >> use a key.
> >>
> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> >> combineByKey requires that a “createCombiner” function is provided, and
> the
> >> return value from that function must be serializable using Kryo. When I
> >> switched to using rdd.aggregate I assumed that the zero value would
> also be
> >> strictly Kryo serialized, as it is a data item and not part of a
> closure or
> >> the aggregation functions. However, I got a serialization exception as
> the
> >> closure serializer (only valid serializer is the Java serializer) was
> used
> >> instead.
> >>
> >> I was wondering the following:
> >>
> >> What is the rationale for making the zero value be serialized using the
> >> closure serializer? This isn’t part of the closure, but is an initial
> data
> >> item.
> >> Would it make sense for us to perhaps write a version of rdd.aggregate()
> >> that takes a function as a parameter, that generates the zero value?
> This
> >> would be more intuitive to be serialized using the closure serializer.
> >>
> >> I believe aggregateByKey is also affected.
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
> >
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah  wrote:
> But RDD.aggregate() has this code:
>
> // Clone the zero value since we will also be serializing it as part of
> tasks
> var jobResult = Utils.clone(zeroValue,
> sc.env.closureSerializer.newInstance())
>
> I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
> we just missed it and need to apply the change to aggregate()? It seems
> appropriate to target a fix for 1.3.0.
>
> -Matt Cheah
> From: Josh Rosen 
> Date: Wednesday, February 18, 2015 at 6:12 AM
> To: Matt Cheah 
> Cc: "dev@spark.apache.org" , Mingyu Kim
> , Andrew Ash 
> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
> reasoning?
>
> It looks like this was fixed in
> https://issues.apache.org/jira/browse/SPARK-4743 /
> https://github.com/apache/spark/pull/3605.  Can you see whether that patch
> fixes this issue for you?
>
>
>
> On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
>>
>> Hi everyone,
>>
>> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
>> before, since I assumed that every aggregation required a key. However, I
>> realized I could do my analysis using JavaRDD’s aggregate() instead and not
>> use a key.
>>
>> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
>> combineByKey requires that a “createCombiner” function is provided, and the
>> return value from that function must be serializable using Kryo. When I
>> switched to using rdd.aggregate I assumed that the zero value would also be
>> strictly Kryo serialized, as it is a data item and not part of a closure or
>> the aggregation functions. However, I got a serialization exception as the
>> closure serializer (only valid serializer is the Java serializer) was used
>> instead.
>>
>> I was wondering the following:
>>
>> What is the rationale for making the zero value be serialized using the
>> closure serializer? This isn’t part of the closure, but is an initial data
>> item.
>> Would it make sense for us to perhaps write a version of rdd.aggregate()
>> that takes a function as a parameter, that generates the zero value? This
>> would be more intuitive to be serialized using the closure serializer.
>>
>> I believe aggregateByKey is also affected.
>>
>> Thanks,
>>
>> -Matt Cheah
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Matt Cheah
But RDD.aggregate() has this code:

// Clone the zero value since we will also be serializing it as part of
tasks
var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen 
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah 
Cc:  "dev@spark.apache.org" , Mingyu Kim
, Andrew Ash 
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
_browse_SPARK-2D4743&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&
r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3
WSrjQQZX1rW_e9w&s=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekc&e=>  /
https://github.com/apache/spark/pull/3605
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar
k_pull_3605&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ
9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1
rW_e9w&s=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8&e=> .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
> Hi everyone,
> 
> I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD¹s aggregate() instead and not
> use a key.
> 
> I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
> requires that a ³createCombiner² function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the aggregation
> functions. However, I got a serialization exception as the closure serializer
> (only valid serializer is the Java serializer) was used instead.
> 
> I was wondering the following:
> 1. What is the rationale for making the zero value be serialized using the
> closure serializer? This isn¹t part of the closure, but is an initial data
> item.
> 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
> I believe aggregateByKey is also affected.
> 
> Thanks,
> 
> -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Josh Rosen
It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743 /
https://github.com/apache/spark/pull/3605.  Can you see whether that patch
fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:

> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> combineByKey requires that a “createCombiner” function is provided, and the
> return value from that function must be serializable using Kryo. When I
> switched to using rdd.aggregate I assumed that the zero value would also be
> strictly Kryo serialized, as it is a data item and not part of a closure or
> the aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
>1. What is the rationale for making the zero value be serialized using
>the closure serializer? This isn’t part of the closure, but is an initial
>data item.
>2. Would it make sense for us to perhaps write a version of
>rdd.aggregate() that takes a function as a parameter, that generates the
>zero value? This would be more intuitive to be serialized using the closure
>serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah
>


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
The serializer is created with

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

Which is definitely not the closure serializer and so should respect
what you are setting with spark.serializer.

Maybe you can do a quick bit of debugging to see where that assumption
breaks down? like are you sure spark.serializer is set everywhere?

On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah  wrote:
> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey
> requires that a “createCombiner” function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the
> aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
> What is the rationale for making the zero value be serialized using the
> closure serializer? This isn’t part of the closure, but is an initial data
> item.
> Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-17 Thread Matt Cheah
Hi everyone,

I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
before, since I assumed that every aggregation required a key. However, I
realized I could do my analysis using JavaRDD¹s aggregate() instead and not
use a key.

I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
requires that a ³createCombiner² function is provided, and the return value
from that function must be serializable using Kryo. When I switched to using
rdd.aggregate I assumed that the zero value would also be strictly Kryo
serialized, as it is a data item and not part of a closure or the
aggregation functions. However, I got a serialization exception as the
closure serializer (only valid serializer is the Java serializer) was used
instead.

I was wondering the following:
1. What is the rationale for making the zero value be serialized using the
closure serializer? This isn¹t part of the closure, but is an initial data
item.
2. Would it make sense for us to perhaps write a version of rdd.aggregate()
that takes a function as a parameter, that generates the zero value? This
would be more intuitive to be serialized using the closure serializer.
I believe aggregateByKey is also affected.

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature