Repository: spark Updated Branches: refs/heads/master b3e6eca81 -> b2d8c0222
SPARK-6044 [CORE] RDD.aggregate() should not use the closure serializer on the zero value Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer. Compare with https://github.com/apache/spark/blob/e60ad2f4c47b011be7a3198689ac2b82ee317d96/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L127 Author: Sean Owen <so...@cloudera.com> Closes #5028 from srowen/SPARK-6044 and squashes the following commits: a4040a7 [Sean Owen] Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2d8c022 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2d8c022 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2d8c022 Branch: refs/heads/master Commit: b2d8c02224892192b1aa314b4265fe50845932f9 Parents: b3e6eca Author: Sean Owen <so...@cloudera.com> Authored: Mon Mar 16 23:58:52 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Mar 16 23:58:52 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b2d8c022/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index cf04330..a139780 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -960,7 +960,7 @@ abstract class RDD[T: ClassTag]( */ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks - var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org