[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r145294297 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- thansk for the advance, I'll close it and try `mapPartitions(...).collect` in `NaiveBayes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/19317 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144767175 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- BTW `collect all the map directly to driver` may easily OOM the driver, while shuffling to multiple reducers can reduce the memory pressure. Even if only shuffle to one reducer, it still better as the executor usually have more memory than the driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144766776 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- > collect all the `map` directly to driver technically it's a shuffle too, and generally `aggregateByKey` is better for the following reasons: 1. The final combine is executed on multiple reducers, which has better parallelism than doing it on the driver. 2. We should always prefer doing computation on executors instead of the driver, because the driver is responsible for scheduling and has a high cost of failure recovery. 3. using spark shuffle is better for fault tolerant. If one reducer failed, you don't need to rerun all mappers. So I'm -1 on this API. If there are special cases we wanna to local aggregate, just call `RDD.map.collect` and do the local aggregate manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144760426 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- `aggregateByKey(...).toLocalIterator` need a shuffle for `aggregateByKey` and then collect the `RDD` to driver as a iterator. But `aggregateByKeyLocally` seems like the `aggregateByKey`, while there isn't a shuffle. It calculates the combines in each task and then collect all the `map` direcly to driver and do the finally combines on driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144757881 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- what's the difference between `aggregateByKeyLocally` and `aggregateByKey(...).toLocalIterator`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144755666 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- Yeah, it will. Here the 'difference' means it directly returns a map to the driver rather than an RDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144753719 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- doesn't `aggregateByKey` perform map side combine? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19317 [SPARK-22098][CORE] Add new method aggregateByKeyLocally in RDD ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-22096 NaiveBayes currently takes aggreateByKey followed by a collect to calculate frequency for each feature/label. We can implement a new function 'aggregateByKeyLocally' in RDD that merges locally on each mapper before sending results to a reducer to save one stage. We tested on NaiveBayes and see ~20% performance gain with these changes. This is a subtask of our improvement. ## How was this patch tested? New UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark aggregatebykeylocally Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19317.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19317 commit 73a85dc5963ac46f181a9499deabb18da4ccc308 Author: Xianyang LiuDate: 2017-08-31T05:16:09Z add new method 'aggregateByKeyLocally' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org