[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14140 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70819198 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() +val keyedInput = input.keyBy(_._2) +val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, x._1)).toArray)) --- End diff -- Ah I'm reading badly this morning. Of course right about `keyedInput` and yes the `Iterator` wraps the array, right. But I think that your last change is spot on, at least one conversion could be avoided. LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user neggert commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70818220 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() +val keyedInput = input.keyBy(_._2) +val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, x._1)).toArray)) --- End diff -- Actually, the `Iterator` is necessary. Since we removed the `glom`, we need to return an `RDD[Array[T]]`. So the function passed to `mapPartitions` needs to be `Iterator[T] => Iterator[Array[T]]`. Removed the extraneous `toSeq`, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user neggert commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70816925 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() +val keyedInput = input.keyBy(_._2) +val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, x._1)).toArray)) --- End diff -- `keyedInput` is separated out because it needs to be used several times. Will clean up the sorting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70764029 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() +val keyedInput = input.keyBy(_._2) +val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions( p => Iterator(p.toSeq.sortBy(x => (x._2, x._1)).toArray)) --- End diff -- One last question here -- there's no need to make a Seq into an Iterator and then into an Array? the Iterator can go away. For that matter, if you call `toArray` to begin with, I think you sort the array and return it possibly without copying. (Also, remove the space before p =>). You could also consider not separating out `keyedInput`, to match the original, but that's a question of taste. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70453024 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,8 +409,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) +val keyedInput = input --- End diff -- oh good point. Well, if you're following your current approach, I think you could get rid of the glom at least and just hand back an `Iterator(...toArray)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user neggert commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70451628 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,8 +409,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) +val keyedInput = input --- End diff -- Also, doesn't the `glom` load the whole partition into memory anyway? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70451380 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,8 +409,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) +val keyedInput = input --- End diff -- I think you can supply your own ordering and partitioner separately? the ordering is defined implicitly, which is sort of awkward to override. But then you should be able to partition differently from that ordering. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L50 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user neggert commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70450795 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,8 +409,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) +val keyedInput = input --- End diff -- `repartitionAndSortWithinPartitions` requires the partition key and the sortBy key to be the same. We want to partition by feature, then sort by feature *and* label. So it would still require a second step to sort the partitions, although they'd be mostly sorted already. Maybe there could be a speed-up by using an insertion or Timsort on the mostly-sorted partition... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14140: [SPARK-16426][MLlib] Fix bug that caused NaNs in ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14140#discussion_r70429952 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -408,8 +409,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { -val parallelStepResult = input - .sortBy(x => (x._2, x._1)) +val keyedInput = input --- End diff -- I think there may be shorter ways to write this with `groupBy`, but, this and other approaches like that have the big drawback of reading lots of data into memory. Here you have to sort the whole partition in memory (!). How about `repartitionAndSortWithinPartitions`? oddly specific method, but, likely just what you need here, to both partition according to some criteria but then end up with sorted partitions. It's more scalable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org