Repository: spark Updated Branches: refs/heads/master 183242382 -> 71ad945bb
[SPARK-16426][MLLIB] Fix bug that caused NaNs in IsotonicRegression ## What changes were proposed in this pull request? Fixed a bug that caused `NaN`s in `IsotonicRegression`. The problem occurs when training rows with the same feature value but different labels end up on different partitions. This patch changes a `sortBy` call to a `partitionBy(RangePartitioner)` followed by a `mapPartitions(sortBy)` in order to ensure that all rows with the same feature value end up on the same partition. ## How was this patch tested? Added a unit test. Author: z001qdp <nicholas.egg...@target.com> Closes #14140 from neggert/SPARK-16426-isotonic-nan. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71ad945b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71ad945b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71ad945b Branch: refs/heads/master Commit: 71ad945bbbdd154eae852cd7f841e98f7a83e8d4 Parents: 1832423 Author: z001qdp <nicholas.egg...@target.com> Authored: Fri Jul 15 12:30:22 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Jul 15 12:30:22 2016 +0100 ---------------------------------------------------------------------- .../spark/mllib/regression/IsotonicRegression.scala | 9 ++++++--- .../spark/mllib/regression/IsotonicRegressionSuite.scala | 11 +++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/71ad945b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 1cd6f2a..377326f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession +import org.apache.spark.RangePartitioner /** * Regression model for isotonic regression. @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() + val keyedInput = input.keyBy(_._2) + val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) .flatMap(poolAdjacentViolators) .collect() .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. http://git-wip-us.apache.org/repos/asf/spark/blob/71ad945b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index ea4f286..94da626 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w assert(model.predictions === Array(1, 2, 2)) } + test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") { + val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1), + (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + 2) + + val model = new IsotonicRegression().run(trainRDD) + + assert(model.boundaries === Array(1.0, 3.0)) + assert(model.predictions === Array(0.75, 0.75)) + } + test("isotonic regression prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org