Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Yes, that's a bug and should be using the standard serializer. On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen wrote: > That looks, at the least, inconsistent. As far as I know this should > be changed so that the zero value is always cloned via the non-closure > serializer. Any objection to that? > > On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah wrote: > > But RDD.aggregate() has this code: > > > > // Clone the zero value since we will also be serializing it as part > of > > tasks > > var jobResult = Utils.clone(zeroValue, > > sc.env.closureSerializer.newInstance()) > > > > I do see the SparkEnv.get.serializer used in aggregateByKey however. > Perhaps > > we just missed it and need to apply the change to aggregate()? It seems > > appropriate to target a fix for 1.3.0. > > > > -Matt Cheah > > From: Josh Rosen > > Date: Wednesday, February 18, 2015 at 6:12 AM > > To: Matt Cheah > > Cc: "dev@spark.apache.org" , Mingyu Kim > > , Andrew Ash > > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero > value > > reasoning? > > > > It looks like this was fixed in > > https://issues.apache.org/jira/browse/SPARK-4743 / > > https://github.com/apache/spark/pull/3605. Can you see whether that > patch > > fixes this issue for you? > > > > > > > > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah wrote: > >> > >> Hi everyone, > >> > >> I was using JavaPairRDD’s combineByKey() to compute all of my > aggregations > >> before, since I assumed that every aggregation required a key. However, > I > >> realized I could do my analysis using JavaRDD’s aggregate() instead and > not > >> use a key. > >> > >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s > >> combineByKey requires that a “createCombiner” function is provided, and > the > >> return value from that function must be serializable using Kryo. When I > >> switched to using rdd.aggregate I assumed that the zero value would > also be > >> strictly Kryo serialized, as it is a data item and not part of a > closure or > >> the aggregation functions. However, I got a serialization exception as > the > >> closure serializer (only valid serializer is the Java serializer) was > used > >> instead. > >> > >> I was wondering the following: > >> > >> What is the rationale for making the zero value be serialized using the > >> closure serializer? This isn’t part of the closure, but is an initial > data > >> item. > >> Would it make sense for us to perhaps write a version of rdd.aggregate() > >> that takes a function as a parameter, that generates the zero value? > This > >> would be more intuitive to be serialized using the closure serializer. > >> > >> I believe aggregateByKey is also affected. > >> > >> Thanks, > >> > >> -Matt Cheah > > > > > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that? On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah wrote: > But RDD.aggregate() has this code: > > // Clone the zero value since we will also be serializing it as part of > tasks > var jobResult = Utils.clone(zeroValue, > sc.env.closureSerializer.newInstance()) > > I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps > we just missed it and need to apply the change to aggregate()? It seems > appropriate to target a fix for 1.3.0. > > -Matt Cheah > From: Josh Rosen > Date: Wednesday, February 18, 2015 at 6:12 AM > To: Matt Cheah > Cc: "dev@spark.apache.org" , Mingyu Kim > , Andrew Ash > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value > reasoning? > > It looks like this was fixed in > https://issues.apache.org/jira/browse/SPARK-4743 / > https://github.com/apache/spark/pull/3605. Can you see whether that patch > fixes this issue for you? > > > > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah wrote: >> >> Hi everyone, >> >> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations >> before, since I assumed that every aggregation required a key. However, I >> realized I could do my analysis using JavaRDD’s aggregate() instead and not >> use a key. >> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s >> combineByKey requires that a “createCombiner” function is provided, and the >> return value from that function must be serializable using Kryo. When I >> switched to using rdd.aggregate I assumed that the zero value would also be >> strictly Kryo serialized, as it is a data item and not part of a closure or >> the aggregation functions. However, I got a serialization exception as the >> closure serializer (only valid serializer is the Java serializer) was used >> instead. >> >> I was wondering the following: >> >> What is the rationale for making the zero value be serialized using the >> closure serializer? This isn’t part of the closure, but is an initial data >> item. >> Would it make sense for us to perhaps write a version of rdd.aggregate() >> that takes a function as a parameter, that generates the zero value? This >> would be more intuitive to be serialized using the closure serializer. >> >> I believe aggregateByKey is also affected. >> >> Thanks, >> >> -Matt Cheah > > - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah Cc: "dev@spark.apache.org" , Mingyu Kim , Andrew Ash Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira _browse_SPARK-2D4743&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8& r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3 WSrjQQZX1rW_e9w&s=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekc&e=> / https://github.com/apache/spark/pull/3605 <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar k_pull_3605&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ 9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1 rW_e9w&s=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8&e=> . Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah wrote: > Hi everyone, > > I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations > before, since I assumed that every aggregation required a key. However, I > realized I could do my analysis using JavaRDD¹s aggregate() instead and not > use a key. > > I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey > requires that a ³createCombiner² function is provided, and the return value > from that function must be serializable using Kryo. When I switched to using > rdd.aggregate I assumed that the zero value would also be strictly Kryo > serialized, as it is a data item and not part of a closure or the aggregation > functions. However, I got a serialization exception as the closure serializer > (only valid serializer is the Java serializer) was used instead. > > I was wondering the following: > 1. What is the rationale for making the zero value be serialized using the > closure serializer? This isn¹t part of the closure, but is an initial data > item. > 2. Would it make sense for us to perhaps write a version of rdd.aggregate() > that takes a function as a parameter, that generates the zero value? This > would be more intuitive to be serialized using the closure serializer. > I believe aggregateByKey is also affected. > > Thanks, > > -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah wrote: > Hi everyone, > > I was using JavaPairRDD’s combineByKey() to compute all of my aggregations > before, since I assumed that every aggregation required a key. However, I > realized I could do my analysis using JavaRDD’s aggregate() instead and not > use a key. > > I have set spark.serializer to use Kryo. As a result, JavaRDD’s > combineByKey requires that a “createCombiner” function is provided, and the > return value from that function must be serializable using Kryo. When I > switched to using rdd.aggregate I assumed that the zero value would also be > strictly Kryo serialized, as it is a data item and not part of a closure or > the aggregation functions. However, I got a serialization exception as the > closure serializer (only valid serializer is the Java serializer) was used > instead. > > I was wondering the following: > >1. What is the rationale for making the zero value be serialized using >the closure serializer? This isn’t part of the closure, but is an initial >data item. >2. Would it make sense for us to perhaps write a version of >rdd.aggregate() that takes a function as a parameter, that generates the >zero value? This would be more intuitive to be serialized using the closure >serializer. > > I believe aggregateByKey is also affected. > > Thanks, > > -Matt Cheah >
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
The serializer is created with val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) Which is definitely not the closure serializer and so should respect what you are setting with spark.serializer. Maybe you can do a quick bit of debugging to see where that assumption breaks down? like are you sure spark.serializer is set everywhere? On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah wrote: > Hi everyone, > > I was using JavaPairRDD’s combineByKey() to compute all of my aggregations > before, since I assumed that every aggregation required a key. However, I > realized I could do my analysis using JavaRDD’s aggregate() instead and not > use a key. > > I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey > requires that a “createCombiner” function is provided, and the return value > from that function must be serializable using Kryo. When I switched to using > rdd.aggregate I assumed that the zero value would also be strictly Kryo > serialized, as it is a data item and not part of a closure or the > aggregation functions. However, I got a serialization exception as the > closure serializer (only valid serializer is the Java serializer) was used > instead. > > I was wondering the following: > > What is the rationale for making the zero value be serialized using the > closure serializer? This isn’t part of the closure, but is an initial data > item. > Would it make sense for us to perhaps write a version of rdd.aggregate() > that takes a function as a parameter, that generates the zero value? This > would be more intuitive to be serialized using the closure serializer. > > I believe aggregateByKey is also affected. > > Thanks, > > -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature