Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
The serializer is created with

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

Which is definitely not the closure serializer and so should respect
what you are setting with spark.serializer.

Maybe you can do a quick bit of debugging to see where that assumption
breaks down? like are you sure spark.serializer is set everywhere?

On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey
 requires that a “createCombiner” function is provided, and the return value
 from that function must be serializable using Kryo. When I switched to using
 rdd.aggregate I assumed that the zero value would also be strictly Kryo
 serialized, as it is a data item and not part of a closure or the
 aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

 What is the rationale for making the zero value be serialized using the
 closure serializer? This isn’t part of the closure, but is an initial data
 item.
 Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Matt Cheah
But RDD.aggregate() has this code:

// Clone the zero value since we will also be serializing it as part of
tasks
var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen rosenvi...@gmail.com
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah mch...@palantir.com
Cc:  dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
m...@palantir.com, Andrew Ash a...@palantir.com
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
_browse_SPARK-2D4743d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8
r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3
WSrjQQZX1rW_e9ws=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekce=  /
https://github.com/apache/spark/pull/3605
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar
k_pull_3605d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ
9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1
rW_e9ws=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8e= .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,
 
 I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD¹s aggregate() instead and not
 use a key.
 
 I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
 requires that a ³createCombiner² function is provided, and the return value
 from that function must be serializable using Kryo. When I switched to using
 rdd.aggregate I assumed that the zero value would also be strictly Kryo
 serialized, as it is a data item and not part of a closure or the aggregation
 functions. However, I got a serialization exception as the closure serializer
 (only valid serializer is the Java serializer) was used instead.
 
 I was wondering the following:
 1. What is the rationale for making the zero value be serialized using the
 closure serializer? This isn¹t part of the closure, but is an initial data
 item.
 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.
 I believe aggregateByKey is also affected.
 
 Thanks,
 
 -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Josh Rosen
It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743 /
https://github.com/apache/spark/pull/3605.  Can you see whether that patch
fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s
 combineByKey requires that a “createCombiner” function is provided, and the
 return value from that function must be serializable using Kryo. When I
 switched to using rdd.aggregate I assumed that the zero value would also be
 strictly Kryo serialized, as it is a data item and not part of a closure or
 the aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

1. What is the rationale for making the zero value be serialized using
the closure serializer? This isn’t part of the closure, but is an initial
data item.
2. Would it make sense for us to perhaps write a version of
rdd.aggregate() that takes a function as a parameter, that generates the
zero value? This would be more intuitive to be serialized using the closure
serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote:
 But RDD.aggregate() has this code:

 // Clone the zero value since we will also be serializing it as part of
 tasks
 var jobResult = Utils.clone(zeroValue,
 sc.env.closureSerializer.newInstance())

 I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
 we just missed it and need to apply the change to aggregate()? It seems
 appropriate to target a fix for 1.3.0.

 -Matt Cheah
 From: Josh Rosen rosenvi...@gmail.com
 Date: Wednesday, February 18, 2015 at 6:12 AM
 To: Matt Cheah mch...@palantir.com
 Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
 m...@palantir.com, Andrew Ash a...@palantir.com
 Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
 reasoning?

 It looks like this was fixed in
 https://issues.apache.org/jira/browse/SPARK-4743 /
 https://github.com/apache/spark/pull/3605.  Can you see whether that patch
 fixes this issue for you?



 On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s
 combineByKey requires that a “createCombiner” function is provided, and the
 return value from that function must be serializable using Kryo. When I
 switched to using rdd.aggregate I assumed that the zero value would also be
 strictly Kryo serialized, as it is a data item and not part of a closure or
 the aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

 What is the rationale for making the zero value be serialized using the
 closure serializer? This isn’t part of the closure, but is an initial data
 item.
 Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Reynold Xin
Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen so...@cloudera.com wrote:

 That looks, at the least, inconsistent. As far as I know this should
 be changed so that the zero value is always cloned via the non-closure
 serializer. Any objection to that?

 On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote:
  But RDD.aggregate() has this code:
 
  // Clone the zero value since we will also be serializing it as part
 of
  tasks
  var jobResult = Utils.clone(zeroValue,
  sc.env.closureSerializer.newInstance())
 
  I do see the SparkEnv.get.serializer used in aggregateByKey however.
 Perhaps
  we just missed it and need to apply the change to aggregate()? It seems
  appropriate to target a fix for 1.3.0.
 
  -Matt Cheah
  From: Josh Rosen rosenvi...@gmail.com
  Date: Wednesday, February 18, 2015 at 6:12 AM
  To: Matt Cheah mch...@palantir.com
  Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
  m...@palantir.com, Andrew Ash a...@palantir.com
  Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
 value
  reasoning?
 
  It looks like this was fixed in
  https://issues.apache.org/jira/browse/SPARK-4743 /
  https://github.com/apache/spark/pull/3605.  Can you see whether that
 patch
  fixes this issue for you?
 
 
 
  On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:
 
  Hi everyone,
 
  I was using JavaPairRDD’s combineByKey() to compute all of my
 aggregations
  before, since I assumed that every aggregation required a key. However,
 I
  realized I could do my analysis using JavaRDD’s aggregate() instead and
 not
  use a key.
 
  I have set spark.serializer to use Kryo. As a result, JavaRDD’s
  combineByKey requires that a “createCombiner” function is provided, and
 the
  return value from that function must be serializable using Kryo. When I
  switched to using rdd.aggregate I assumed that the zero value would
 also be
  strictly Kryo serialized, as it is a data item and not part of a
 closure or
  the aggregation functions. However, I got a serialization exception as
 the
  closure serializer (only valid serializer is the Java serializer) was
 used
  instead.
 
  I was wondering the following:
 
  What is the rationale for making the zero value be serialized using the
  closure serializer? This isn’t part of the closure, but is an initial
 data
  item.
  Would it make sense for us to perhaps write a version of rdd.aggregate()
  that takes a function as a parameter, that generates the zero value?
 This
  would be more intuitive to be serialized using the closure serializer.
 
  I believe aggregateByKey is also affected.
 
  Thanks,
 
  -Matt Cheah
 
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-17 Thread Matt Cheah
Hi everyone,

I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
before, since I assumed that every aggregation required a key. However, I
realized I could do my analysis using JavaRDD¹s aggregate() instead and not
use a key.

I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
requires that a ³createCombiner² function is provided, and the return value
from that function must be serializable using Kryo. When I switched to using
rdd.aggregate I assumed that the zero value would also be strictly Kryo
serialized, as it is a data item and not part of a closure or the
aggregation functions. However, I got a serialization exception as the
closure serializer (only valid serializer is the Java serializer) was used
instead.

I was wondering the following:
1. What is the rationale for making the zero value be serialized using the
closure serializer? This isn¹t part of the closure, but is an initial data
item.
2. Would it make sense for us to perhaps write a version of rdd.aggregate()
that takes a function as a parameter, that generates the zero value? This
would be more intuitive to be serialized using the closure serializer.
I believe aggregateByKey is also affected.

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature