[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14140


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70819198
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
-  .glom()
+val keyedInput = input.keyBy(_._2)
+val parallelStepResult = keyedInput
+  .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
+  .values
+  .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, 
x._1)).toArray))
--- End diff --

Ah I'm reading badly this morning. Of course right about `keyedInput` and 
yes the `Iterator` wraps the array, right. But I think that your last change is 
spot on, at least one conversion could be avoided. LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-14 Thread neggert
Github user neggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70818220
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
-  .glom()
+val keyedInput = input.keyBy(_._2)
+val parallelStepResult = keyedInput
+  .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
+  .values
+  .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, 
x._1)).toArray))
--- End diff --

Actually, the `Iterator` is necessary. Since we removed the `glom`, we need 
to return an `RDD[Array[T]]`. So the function passed to `mapPartitions` needs 
to be `Iterator[T] => Iterator[Array[T]]`. Removed the extraneous `toSeq`, 
though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-14 Thread neggert
Github user neggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70816925
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
-  .glom()
+val keyedInput = input.keyBy(_._2)
+val parallelStepResult = keyedInput
+  .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
+  .values
+  .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, 
x._1)).toArray))
--- End diff --

`keyedInput` is separated out because it needs to be used several times. 
Will clean up the sorting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70764029
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
-  .glom()
+val keyedInput = input.keyBy(_._2)
+val parallelStepResult = keyedInput
+  .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
+  .values
+  .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, 
x._1)).toArray))
--- End diff --

One last question here -- there's no need to make a Seq into an Iterator 
and then into an Array? the Iterator can go away. For that matter, if you call 
`toArray` to begin with, I think you sort the array and return it possibly 
without copying. (Also, remove the space before p =>). You could also consider 
not separating out `keyedInput`, to match the original, but that's a question 
of taste.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70453024
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,8 +409,12 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
+val keyedInput = input
--- End diff --

oh good point. Well, if you're following your current approach, I think you 
could get rid of the glom at least and just hand back an `Iterator(...toArray)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-12 Thread neggert
Github user neggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70451628
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,8 +409,12 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
+val keyedInput = input
--- End diff --

Also, doesn't the `glom` load the whole partition into memory anyway?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70451380
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,8 +409,12 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
+val keyedInput = input
--- End diff --

I think you can supply your own ordering and partitioner separately? the 
ordering is defined implicitly, which is sort of awkward to override. But then 
you should be able to partition differently from that ordering.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L50


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-12 Thread neggert
Github user neggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70450795
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,8 +409,12 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
+val keyedInput = input
--- End diff --

`repartitionAndSortWithinPartitions` requires the partition key and the 
sortBy key to be the same. We want to partition by feature, then sort by 
feature *and* label. So it would still require a second step to sort the 
partitions, although they'd be mostly sorted already. Maybe there could be a 
speed-up by using an insertion or Timsort on the mostly-sorted partition...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...

2016-07-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14140#discussion_r70429952
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
@@ -408,8 +409,12 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
*/
   private def parallelPoolAdjacentViolators(
   input: RDD[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
-val parallelStepResult = input
-  .sortBy(x => (x._2, x._1))
+val keyedInput = input
--- End diff --

I think there may be shorter ways to write this with `groupBy`, but, this 
and other approaches like that have the big drawback of reading lots of data 
into memory. Here you have to sort the whole partition in memory (!).

How about `repartitionAndSortWithinPartitions`? oddly specific method, but, 
likely just what you need here, to both partition according to some criteria 
but then end up with sorted partitions. It's more scalable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org