[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21971#discussion_r207898987 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi (index, data) => results(index) = data, results.flatten.toSeq) } + + /** + * Returns a future of an aggregation across the RDD. + * + * @see [[RDD.aggregate]] which is the synchronous version of this method. + */ + def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): FutureAction[U] = +self.withScope { --- End diff -- IMHO one reason could be the rule of fail fast and early. As the cloning uses serialisation/deserialisation which anyway needed for sending the neutral element to the executors this way serialisation of the neutral element is tested when the operator is specified not during the lazy execution in the middle of a long chain of operations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync
Github user ceedubs commented on a diff in the pull request: https://github.com/apache/spark/pull/21971#discussion_r207246356 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi (index, data) => results(index) = data, results.flatten.toSeq) } + + /** + * Returns a future of an aggregation across the RDD. + * + * @see [[RDD.aggregate]] which is the synchronous version of this method. + */ + def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): FutureAction[U] = +self.withScope { --- End diff -- In the synchronous version of `aggregate`, the `zeroValue` is cloned, which requires adding an implicit `ClassTag[U]` argument. I didn't really understand the motivation for that, so I didn't do it here, but I was hoping that someone who understood the cloning could let me know here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync
Github user ceedubs commented on a diff in the pull request: https://github.com/apache/spark/pull/21971#discussion_r207246593 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi (index, data) => results(index) = data, results.flatten.toSeq) } + + /** + * Returns a future of an aggregation across the RDD. + * + * @see [[RDD.aggregate]] which is the synchronous version of this method. + */ + def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): FutureAction[U] = +self.withScope { + val cleanSeqOp = self.context.clean(seqOp) + val cleanCombOp = self.context.clean(combOp) + val combBinOp = new BinaryOperator[U] { --- End diff -- Is there a cleaner way to integrate with `BinaryOperator` before Scala 2.12? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync
GitHub user ceedubs opened a pull request: https://github.com/apache/spark/pull/21971 [SPARK-24947] [Core] aggregateAsync and foldAsync See the description in the [Jira ticket](https://issues.apache.org/jira/browse/SPARK-24947). This contribution is my original work (inspired by similar methods in the Spark code) and I license this work to Spark under the Apache License 2.0. ## What changes were proposed in this pull request? Add `aggregateAsync` and `foldAsync` methods to `AsyncRDDActions`. ## How was this patch tested? Unit tests (included in PR). You can merge this pull request into a Git repository by running: $ git pull https://github.com/ceedubs/spark async-aggregate Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21971.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21971 commit b03434a3595bc3ce5faca7dd2218ef80a71bab0b Author: Cody Allen Date: 2018-08-01T15:03:35Z [SPARK-24947] [Core] aggregateAsync and foldAsync See the description in the [Jira ticket](https://issues.apache.org/jira/browse/SPARK-24947). This contribution is my original work (inspired by similar methods in the Spark code) and I license this work to Spark under the Apache License 2.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org