[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync

2018-08-06 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/21971#discussion_r207898987
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
   (index, data) => results(index) = data, results.flatten.toSeq)
   }
 
+
+  /**
+   * Returns a future of an aggregation across the RDD.
+   *
+   * @see [[RDD.aggregate]] which is the synchronous version of this 
method.
+   */
+  def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U): FutureAction[U] =
+self.withScope {
--- End diff --

IMHO one reason could be the rule of fail fast and early. As the cloning 
uses serialisation/deserialisation which anyway needed for sending the neutral 
element to the executors this way serialisation of the neutral element is 
tested when the operator is specified not during the lazy execution in the 
middle of a long chain of operations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync

2018-08-02 Thread ceedubs
Github user ceedubs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21971#discussion_r207246356
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
   (index, data) => results(index) = data, results.flatten.toSeq)
   }
 
+
+  /**
+   * Returns a future of an aggregation across the RDD.
+   *
+   * @see [[RDD.aggregate]] which is the synchronous version of this 
method.
+   */
+  def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U): FutureAction[U] =
+self.withScope {
--- End diff --

In the synchronous version of `aggregate`, the `zeroValue` is cloned, which 
requires adding an implicit `ClassTag[U]` argument. I didn't really understand 
the motivation for that, so I didn't do it here, but I was hoping that someone 
who understood the cloning could let me know here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync

2018-08-02 Thread ceedubs
Github user ceedubs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21971#discussion_r207246593
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -61,6 +62,36 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
   (index, data) => results(index) = data, results.flatten.toSeq)
   }
 
+
+  /**
+   * Returns a future of an aggregation across the RDD.
+   *
+   * @see [[RDD.aggregate]] which is the synchronous version of this 
method.
+   */
+  def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U): FutureAction[U] =
+self.withScope {
+  val cleanSeqOp = self.context.clean(seqOp)
+  val cleanCombOp = self.context.clean(combOp)
+  val combBinOp = new BinaryOperator[U] {
--- End diff --

Is there a cleaner way to integrate with `BinaryOperator` before Scala 2.12?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21971: [SPARK-24947] [Core] aggregateAsync and foldAsync

2018-08-02 Thread ceedubs
GitHub user ceedubs opened a pull request:

https://github.com/apache/spark/pull/21971

[SPARK-24947] [Core] aggregateAsync and foldAsync

See the description in the [Jira 
ticket](https://issues.apache.org/jira/browse/SPARK-24947).

This contribution is my original work (inspired by similar methods in
the Spark code) and I license this work to Spark under the Apache
License 2.0.

## What changes were proposed in this pull request?

Add `aggregateAsync` and `foldAsync` methods to `AsyncRDDActions`.

## How was this patch tested?

Unit tests (included in PR).


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ceedubs/spark async-aggregate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21971.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21971


commit b03434a3595bc3ce5faca7dd2218ef80a71bab0b
Author: Cody Allen 
Date:   2018-08-01T15:03:35Z

[SPARK-24947] [Core] aggregateAsync and foldAsync

See the description in the [Jira 
ticket](https://issues.apache.org/jira/browse/SPARK-24947).

This contribution is my original work (inspired by similar methods in
the Spark code) and I license this work to Spark under the Apache
License 2.0.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org