Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
The serializer is created with val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) Which is definitely not the closure serializer and so should respect what you are setting with spark.serializer. Maybe you can do a quick bit of debugging to see where that assumption breaks down? like are you sure spark.serializer is set everywhere? On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira _browse_SPARK-2D4743d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8 r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3 WSrjQQZX1rW_e9ws=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekce= / https://github.com/apache/spark/pull/3605 https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar k_pull_3605d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ 9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1 rW_e9ws=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8e= . Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that? On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote: But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Yes, that's a bug and should be using the standard serializer. On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen so...@cloudera.com wrote: That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that? On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote: But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature