[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r145294297
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

thansk for the advance, I'll close it and try `mapPartitions(...).collect` 
in `NaiveBayes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/19317


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144767175
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

BTW `collect all the map directly to driver` may easily OOM the driver, 
while shuffling to multiple reducers can reduce the memory pressure. Even if 
only shuffle to one reducer, it still better as the executor usually have more 
memory than the driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144766776
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

> collect all the `map` directly to driver

technically it's a shuffle too, and generally `aggregateByKey` is better 
for the following reasons:
1. The final combine is executed on multiple reducers, which has better 
parallelism than doing it on the driver.
2. We should always prefer doing computation on executors instead of the 
driver, because the driver is responsible for scheduling and has a high cost of 
failure recovery.
3. using spark shuffle is better for fault tolerant. If one reducer failed, 
you don't need to rerun all mappers.

So I'm -1 on this API. If there are special cases we wanna to local 
aggregate, just call `RDD.map.collect` and do the local aggregate manually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-16 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144760426
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

`aggregateByKey(...).toLocalIterator` need a shuffle for `aggregateByKey` 
and then collect the `RDD` to driver as a iterator. But `aggregateByKeyLocally` 
seems like the `aggregateByKey`, while there isn't a shuffle. It calculates the 
combines in each task and then collect all the `map` direcly to driver and do 
the finally combines on driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144757881
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

what's the difference between `aggregateByKeyLocally` and 
`aggregateByKey(...).toLocalIterator`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-15 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144755666
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

Yeah, it will. Here the 'difference' means it directly returns a map to the 
driver rather than an RDD.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144753719
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

doesn't `aggregateByKey` perform map side combine?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-09-21 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19317

[SPARK-22098][CORE] Add new method aggregateByKeyLocally in RDD

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-22096

NaiveBayes currently takes aggreateByKey followed by a collect to calculate 
frequency for each feature/label. We can implement a new function 
'aggregateByKeyLocally' in RDD that merges locally on each mapper before 
sending results to a reducer to save one stage.
We tested on NaiveBayes and see ~20% performance gain with these changes.

This is a subtask of our improvement.

## How was this patch tested?

New UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark aggregatebykeylocally

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19317.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19317


commit 73a85dc5963ac46f181a9499deabb18da4ccc308
Author: Xianyang Liu 
Date:   2017-08-31T05:16:09Z

add new method 'aggregateByKeyLocally'




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org