This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f92c827acab [SPARK-41008][MLLIB] Follow-up isotonic regression 
features deduplica…
f92c827acab is described below

commit f92c827acabccf547d5a1dff4f7ec371bc370230
Author: Ahmed Mahran <ahmed.mah...@mashin.io>
AuthorDate: Sun Dec 11 15:01:15 2022 -0600

    [SPARK-41008][MLLIB] Follow-up isotonic regression features deduplica…
    
    ### What changes were proposed in this pull request?
    
    A follow-up on https://github.com/apache/spark/pull/38966 to update 
relevant documentation and remove redundant sort key.
    
    ### Why are the changes needed?
    
    For isotonic regression, another method for breaking ties of repeated 
features was introduced in https://github.com/apache/spark/pull/38966. This 
will aggregate points having the same feature value by computing the weighted 
average of the labels.
    - This only requires points to be sorted by features instead of features 
and labels. So, we should remove label as a secondary sorting key.
    - Isotonic regression documentation needs to be updated to reflect the new 
behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Isotonic regression documentation update. The documentation described the 
behavior of the algorithm when there are points in the input with repeated 
features. Since this behavior has changed, documentation needs to describe the 
new behavior.
    
    ### How was this patch tested?
    
    Existing tests passed. No need to add new tests since existing tests are 
already comprehensive.
    
    srowen
    
    Closes #38996 from ahmed-mahran/ml-isotonic-reg-dups-follow-up.
    
    Authored-by: Ahmed Mahran <ahmed.mah...@mashin.io>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 docs/mllib-isotonic-regression.md                  | 18 ++---
 .../mllib/regression/IsotonicRegression.scala      | 82 ++++++++++------------
 .../mllib/regression/IsotonicRegressionSuite.scala | 29 +++++---
 3 files changed, 67 insertions(+), 62 deletions(-)

diff --git a/docs/mllib-isotonic-regression.md 
b/docs/mllib-isotonic-regression.md
index 95be32a819e..711e828bd80 100644
--- a/docs/mllib-isotonic-regression.md
+++ b/docs/mllib-isotonic-regression.md
@@ -43,7 +43,14 @@ best fitting the original data points.
 which uses an approach to
 [parallelizing isotonic 
regression](https://doi.org/10.1007/978-3-642-99789-1_10).
 The training input is an RDD of tuples of three double values that represent
-label, feature and weight in this order. Additionally, IsotonicRegression 
algorithm has one
+label, feature and weight in this order. In case there are multiple tuples with
+the same feature then these tuples are aggregated into a single tuple as 
follows:
+
+* Aggregated label is the weighted average of all labels.
+* Aggregated feature is the unique feature value.
+* Aggregated weight is the sum of all weights.
+
+Additionally, IsotonicRegression algorithm has one
 optional parameter called $isotonic$ defaulting to true.
 This argument specifies if the isotonic regression is
 isotonic (monotonically increasing) or antitonic (monotonically decreasing).
@@ -53,17 +60,12 @@ labels for both known and unknown features. The result of 
isotonic regression
 is treated as piecewise linear function. The rules for prediction therefore 
are:
 
 * If the prediction input exactly matches a training feature
-  then associated prediction is returned. In case there are multiple 
predictions with the same
-  feature then one of them is returned. Which one is undefined
-  (same as java.util.Arrays.binarySearch).
+  then associated prediction is returned.
 * If the prediction input is lower or higher than all training features
   then prediction with lowest or highest feature is returned respectively.
-  In case there are multiple predictions with the same feature
-  then the lowest or highest is returned respectively.
 * If the prediction input falls between two training features then prediction 
is treated
   as piecewise linear function and interpolated value is calculated from the
-  predictions of the two closest features. In case there are multiple values
-  with the same feature then the same rules as in previous point are used.
+  predictions of the two closest features.
 
 ### Examples
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 0b2bf147501..fbf0dc9c357 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -23,7 +23,6 @@ import java.util.Arrays.binarySearch
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.commons.math3.util.Precision
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
@@ -272,8 +271,8 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
    * @param input RDD of tuples (label, feature, weight) where label is 
dependent variable
    *              for which we calculate isotonic regression, feature is 
independent variable
    *              and weight represents number of measures with default 1.
-   *              If multiple labels share the same feature value then they 
are ordered before
-   *              the algorithm is executed.
+   *              If multiple labels share the same feature value then they 
are aggregated using
+   *              the weighted average before the algorithm is executed.
    * @return Isotonic regression model.
    */
   @Since("1.3.0")
@@ -298,8 +297,8 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
    * @param input JavaRDD of tuples (label, feature, weight) where label is 
dependent variable
    *              for which we calculate isotonic regression, feature is 
independent variable
    *              and weight represents number of measures with default 1.
-   *              If multiple labels share the same feature value then they 
are ordered before
-   *              the algorithm is executed.
+   *              If multiple labels share the same feature value then they 
are aggregated using
+   *              the weighted average before the algorithm is executed.
    * @return Isotonic regression model.
    */
   @Since("1.3.0")
@@ -310,21 +309,14 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
   /**
    * Aggregates points of duplicate feature values into a single point using 
as label the weighted
    * average of the labels of the points with duplicate feature values. All 
points for a unique
-   * feature values are aggregated as:
+   * feature value are aggregated as:
    *
-   *   - Aggregated label is the weighted average of all labels
-   *   - Aggregated feature is the weighted average of all equal features[1]
-   *   - Aggregated weight is the sum of all weights
+   *   - Aggregated label is the weighted average of all labels.
+   *   - Aggregated feature is the unique feature value.
+   *   - Aggregated weight is the sum of all weights.
    *
-   * [1] Note: It is possible that feature values to be equal up to a 
resolution due to
-   * representation errors, since we cannot know which feature value to use in 
that case, we
-   * compute the weighted average of the features. Ideally, all feature values 
will be equal and
-   * the weighted average is just the value at any point.
-   *
-   * @param input
-   *   Input data of tuples (label, feature, weight). Weights must be 
non-negative.
-   * @return
-   *   Points with unique feature values.
+   * @param input Input data of tuples (label, feature, weight). Weights must 
be non-negative.
+   * @return Points with unique feature values.
    */
   private[regression] def makeUnique(
       input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] 
= {
@@ -339,28 +331,28 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
     if (cleanInput.length <= 1) {
       cleanInput
     } else {
-      // whether or not two double features are equal up to a precision
-      @inline def areEqual(a: Double, b: Double): Boolean = 
Precision.equals(a, b)
-
       val pointsAccumulator = new IsotonicRegression.PointsAccumulator
-      var (_, prevFeature, _) = cleanInput.head
-
-      // Go through input points, merging all points with approximately equal 
feature values into
-      // a single point. Equality of features is defined by areEqual method. 
The label of the
-      // accumulated points is the weighted average of the labels of all 
points of equal feature
-      // value. It is possible that feature values to be equal up to a 
resolution due to
-      // representation errors, since we cannot know which feature value to 
use in that case,
-      // we compute the weighted average of the features.
-      cleanInput.foreach { case point @ (_, feature, _) =>
-        if (areEqual(feature, prevFeature)) {
+
+      // Go through input points, merging all points with equal feature values 
into a single point.
+      // Equality of features is defined by shouldAccumulate method. The label 
of the accumulated
+      // points is the weighted average of the labels of all points of equal 
feature value.
+
+      // Initialize with first point
+      pointsAccumulator := cleanInput.head
+      // Accumulate the rest
+      cleanInput.tail.foreach { case point @ (_, feature, _) =>
+        if (pointsAccumulator.shouldAccumulate(feature)) {
+          // Still on a duplicate feature, accumulate
           pointsAccumulator += point
         } else {
+          // A new unique feature encountered:
+          // - append the last accumulated point to unique features output
           pointsAccumulator.appendToOutput()
+          // - and reset
           pointsAccumulator := point
         }
-        prevFeature = feature
       }
-      // Append the last accumulated point
+      // Append the last accumulated point to unique features output
       pointsAccumulator.appendToOutput()
       pointsAccumulator.getOutput
     }
@@ -488,14 +480,14 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
       // Points with same or adjacent features must collocate within the same 
partition.
       .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
       .values
-      // Lexicographically sort points by features then labels.
-      .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
+      // Lexicographically sort points by features.
+      .mapPartitions(p => Iterator(p.toArray.sortBy(_._2)))
       // Aggregate points with equal features into a single point.
       .map(makeUnique)
       .flatMap(poolAdjacentViolators)
       .collect()
       // Sort again because collect() doesn't promise ordering.
-      .sortBy(x => (x._2, x._1))
+      .sortBy(_._2)
     poolAdjacentViolators(parallelStepResult)
   }
 }
@@ -511,30 +503,32 @@ object IsotonicRegression {
     private var (currentLabel: Double, currentFeature: Double, currentWeight: 
Double) =
       (0d, 0d, 0d)
 
+    /** Whether or not this feature exactly equals the current accumulated 
feature. */
+    @inline def shouldAccumulate(feature: Double): Boolean = currentFeature == 
feature
+
     /** Resets the current value of the point accumulator using the provided 
point. */
-    def :=(point: (Double, Double, Double)): Unit = {
+    @inline def :=(point: (Double, Double, Double)): Unit = {
       val (label, feature, weight) = point
       currentLabel = label * weight
-      currentFeature = feature * weight
+      currentFeature = feature
       currentWeight = weight
     }
 
     /** Accumulates the provided point into the current value of the point 
accumulator. */
-    def +=(point: (Double, Double, Double)): Unit = {
-      val (label, feature, weight) = point
+    @inline def +=(point: (Double, Double, Double)): Unit = {
+      val (label, _, weight) = point
       currentLabel += label * weight
-      currentFeature += feature * weight
       currentWeight += weight
     }
 
     /** Appends the current value of the point accumulator to the output. */
-    def appendToOutput(): Unit =
+    @inline def appendToOutput(): Unit =
       output += ((
         currentLabel / currentWeight,
-        currentFeature / currentWeight,
+        currentFeature,
         currentWeight))
 
     /** Returns all accumulated points so far. */
-    def getOutput: Array[(Double, Double, Double)] = output.toArray
+    @inline def getOutput: Array[(Double, Double, Double)] = output.toArray
   }
 }
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index b59d16be6cd..a206e922e5f 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.mllib.regression
 
-import org.apache.commons.math3.util.Precision
 import org.scalatest.matchers.must.Matchers
 
 import org.apache.spark.{SparkException, SparkFunSuite}
@@ -225,12 +224,18 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
 
   test("SPARK-41008 isotonic regression with duplicate features differs from 
sklearn") {
     val model = runIsotonicRegressionOnInput(
-      Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
+      Seq((1, 0.6, 1), (0, 0.6, 1),
+        (0, 1.0 / 3, 1), (1, 1.0 / 3, 1), (0, 1.0 / 3, 1),
+        (1, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1)),
       true,
       2)
 
-    assert(model.boundaries === Array(1.0, 3.0))
-    assert(model.predictions === Array(0.75, 0.75))
+    assert(model.boundaries === Array(0.2, 1.0 / 3, 0.6))
+    assert(model.predictions === Array(0.25, 1.0 / 3, 0.5))
+
+    assert(model.predict(0.6) === 0.5)
+    assert(model.predict(1.0 / 3) === 1.0 / 3)
+    assert(model.predict(0.2) === 0.25)
   }
 
   test("isotonic regression prediction") {
@@ -327,9 +332,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
   test("makeUnique: handle duplicate features") {
     val regressor = new IsotonicRegression()
     import regressor.makeUnique
-    import Precision.EPSILON
 
-    // Note: input must be lexicographically sorted by (feature, label)
+    // Note: input must be lexicographically sorted by feature
 
     // empty
     assert(makeUnique(Array.empty) === Array.empty)
@@ -373,9 +377,14 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
         (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0),
         (10.0, 3.0, 1.0)))
 
-    // duplicate up to resolution error
-    assert(
-      makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + 
EPSILON, 1.0))) ===
-        Array((1.0, 1.0, 3.0)))
+    // don't handle tiny representation errors
+    // e.g. infinitely adjacent doubles are already unique
+    val adjacentDoubles = {
+      // i-th next representable double to 1.0 is 
java.lang.Double.longBitsToDouble(base + i)
+      val base = java.lang.Double.doubleToRawLongBits(1.0)
+      (0 until 10).map(i => java.lang.Double.longBitsToDouble(base + i))
+        .map((1.0, _, 1.0)).toArray
+    }
+    assert(makeUnique(adjacentDoubles) === adjacentDoubles)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to