This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d05c7e037e [SPARK-41008][MLLIB] Dedup isotonic regression duplicate 
features
3d05c7e037e is described below

commit 3d05c7e037eff79de8ef9f6231aca8340bcc65ef
Author: Ahmed Mahran <ahmed.mah...@mashin.io>
AuthorDate: Thu Dec 8 08:28:48 2022 -0600

    [SPARK-41008][MLLIB] Dedup isotonic regression duplicate features
    
    ### What changes were proposed in this pull request?
    
    Adding a pre-processing step to isotonic regression in mllib to handle 
duplicate features. This is to match `sklearn` implementation. Input points of 
duplicate feature values are aggregated into a single point using as label the 
weighted average of the labels of the points with duplicate feature values. All 
points for a unique feature values are aggregated as:
     - Aggregated label is the weighted average of all labels
     - Aggregated feature is the weighted average of all equal features. It is 
possible that feature values to be equal up to a resolution due to 
representation errors, since we cannot know which feature value to use in that 
case, we compute the weighted average of the features. Ideally, all feature 
values will be equal and the weighted average is just the value at any point.
     - Aggregated weight is the sum of all weights
    
    ### Why are the changes needed?
    
    As per discussion on ticket 
[[SPARK-41008]](https://issues.apache.org/jira/browse/SPARK-41008), it is a bug 
and results should match `sklearn`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    There are no changes to the API, documentation or error messages. However, 
the user should expect results to change.
    
    ### How was this patch tested?
    
    Existing test cases for duplicate features failed. These tests were 
adjusted accordingly. Also, new tests are added.
    
    Here is a python snippet that can be used to verify the results:
    
    ```python
    from sklearn.isotonic import IsotonicRegression
    
    def test(x, y, x_test, isotonic=True):
        ir = IsotonicRegression(out_of_bounds='clip', 
increasing=isotonic).fit(x, y)
        y_test = ir.predict(x_test)
    
        def print_array(label, a):
            print(f"{label}: [{', '.join([str(i) for i in a])}]")
    
        print_array("boundaries", ir.X_thresholds_)
        print_array("predictions", ir.y_thresholds_)
        print_array("y_test", y_test)
    
    test(
        x = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20],
        y = [1, 0, 0, 1, 0, 1, 0, 0, 0],
        x_test = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20]
    )
    ```
    
    srowen zapletal-martin
    
    Closes #38966 from ahmed-mahran/ml-isotonic-reg-dups.
    
    Authored-by: Ahmed Mahran <ahmed.mah...@mashin.io>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../mllib/regression/IsotonicRegression.scala      | 141 +++++++++++++---
 .../mllib/regression/IsotonicRegressionSuite.scala | 180 ++++++++++++++++-----
 2 files changed, 262 insertions(+), 59 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 649f9816e6a..0b2bf147501 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.mllib.regression
 
 import java.io.Serializable
@@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.commons.math3.util.Precision
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
@@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
     run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
   }
 
+  /**
+   * Aggregates points of duplicate feature values into a single point using 
as label the weighted
+   * average of the labels of the points with duplicate feature values. All 
points for a unique
+   * feature values are aggregated as:
+   *
+   *   - Aggregated label is the weighted average of all labels
+   *   - Aggregated feature is the weighted average of all equal features[1]
+   *   - Aggregated weight is the sum of all weights
+   *
+   * [1] Note: It is possible that feature values to be equal up to a 
resolution due to
+   * representation errors, since we cannot know which feature value to use in 
that case, we
+   * compute the weighted average of the features. Ideally, all feature values 
will be equal and
+   * the weighted average is just the value at any point.
+   *
+   * @param input
+   *   Input data of tuples (label, feature, weight). Weights must be 
non-negative.
+   * @return
+   *   Points with unique feature values.
+   */
+  private[regression] def makeUnique(
+      input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] 
= {
+
+    val cleanInput = input.filter { case (y, x, weight) =>
+      require(
+        weight >= 0.0,
+        s"Negative weight at point ($y, $x, $weight). Weights must be 
non-negative")
+      weight > 0
+    }
+
+    if (cleanInput.length <= 1) {
+      cleanInput
+    } else {
+      // whether or not two double features are equal up to a precision
+      @inline def areEqual(a: Double, b: Double): Boolean = 
Precision.equals(a, b)
+
+      val pointsAccumulator = new IsotonicRegression.PointsAccumulator
+      var (_, prevFeature, _) = cleanInput.head
+
+      // Go through input points, merging all points with approximately equal 
feature values into
+      // a single point. Equality of features is defined by areEqual method. 
The label of the
+      // accumulated points is the weighted average of the labels of all 
points of equal feature
+      // value. It is possible that feature values to be equal up to a 
resolution due to
+      // representation errors, since we cannot know which feature value to 
use in that case,
+      // we compute the weighted average of the features.
+      cleanInput.foreach { case point @ (_, feature, _) =>
+        if (areEqual(feature, prevFeature)) {
+          pointsAccumulator += point
+        } else {
+          pointsAccumulator.appendToOutput()
+          pointsAccumulator := point
+        }
+        prevFeature = feature
+      }
+      // Append the last accumulated point
+      pointsAccumulator.appendToOutput()
+      pointsAccumulator.getOutput
+    }
+  }
+
   /**
    * Performs a pool adjacent violators algorithm (PAV). Implements the 
algorithm originally
    * described in [1], using the formulation from [2, 3]. Uses an array to 
keep track of start
@@ -322,35 +381,27 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
    * functions subject to simple chain constraints." SIAM Journal on 
Optimization 10.3 (2000):
    * 658-672.
    *
-   * @param input Input data of tuples (label, feature, weight). Weights must
-                  be non-negative.
+   * @param cleanUniqueInput Input data of tuples(label, feature, 
weight).Features must be unique
+   *                         and weights must be non-negative.
    * @return Result tuples (label, feature, weight) where labels were updated
    *         to form a monotone sequence as per isotonic regression definition.
    */
   private def poolAdjacentViolators(
-      input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] 
= {
+      cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, 
Double, Double)] = {
 
-    val cleanInput = input.filter{ case (y, x, weight) =>
-      require(
-        weight >= 0.0,
-        s"Negative weight at point ($y, $x, $weight). Weights must be 
non-negative"
-      )
-      weight > 0
-    }
-
-    if (cleanInput.isEmpty) {
+    if (cleanUniqueInput.isEmpty) {
       return Array.empty
     }
 
     // Keeps track of the start and end indices of the blocks. if [i, j] is a 
valid block from
     // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and 
blockBounds(j) = i
     // Initially, each data point is its own block.
-    val blockBounds = Array.range(0, cleanInput.length)
+    val blockBounds = Array.range(0, cleanUniqueInput.length)
 
     // Keep track of the sum of weights and sum of weight * y for each block. 
weights(start)
     // gives the values for the block. Entries that are not at the start of a 
block
     // are meaningless.
-    val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, 
weight) =>
+    val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, 
weight) =>
       (weight, weight * y)
     }
 
@@ -392,10 +443,10 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
     // Merge on >= instead of > because it eliminates adjacent blocks with the 
same average, and we
     // want to compress our output as much as possible. Both give correct 
results.
     var i = 0
-    while (nextBlock(i) < cleanInput.length) {
+    while (nextBlock(i) < cleanUniqueInput.length) {
       if (average(i) >= average(nextBlock(i))) {
         merge(i, nextBlock(i))
-        while((i > 0) && (average(prevBlock(i)) >= average(i))) {
+        while ((i > 0) && (average(prevBlock(i)) >= average(i))) {
           i = merge(prevBlock(i), i)
         }
       } else {
@@ -406,15 +457,15 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
     // construct the output by walking through the blocks in order
     val output = ArrayBuffer.empty[(Double, Double, Double)]
     i = 0
-    while (i < cleanInput.length) {
+    while (i < cleanUniqueInput.length) {
       // If block size is > 1, a point at the start and end of the block,
       // each receiving half the weight. Otherwise, a single point with
       // all the weight.
-      if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
-        output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
-        output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
+      if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) {
+        output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2))
+        output += ((average(i), cleanUniqueInput(blockEnd(i))._2, 
weights(i)._1 / 2))
       } else {
-        output += ((average(i), cleanInput(i)._2, weights(i)._1))
+        output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1))
       }
       i = nextBlock(i)
     }
@@ -434,12 +485,56 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
       input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = 
{
     val keyedInput = input.keyBy(_._2)
     val parallelStepResult = keyedInput
+      // Points with same or adjacent features must collocate within the same 
partition.
       .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, 
keyedInput))
       .values
+      // Lexicographically sort points by features then labels.
       .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
+      // Aggregate points with equal features into a single point.
+      .map(makeUnique)
       .flatMap(poolAdjacentViolators)
       .collect()
-      .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't 
promise ordering.
+      // Sort again because collect() doesn't promise ordering.
+      .sortBy(x => (x._2, x._1))
     poolAdjacentViolators(parallelStepResult)
   }
 }
+
+object IsotonicRegression {
+  /**
+   * Utility class, holds a buffer of all points with unique features so far, 
and performs
+   * weighted sum accumulation of points. Hides these details for better 
readability of the
+   * main algorithm.
+   */
+  class PointsAccumulator {
+    private val output = ArrayBuffer[(Double, Double, Double)]()
+    private var (currentLabel: Double, currentFeature: Double, currentWeight: 
Double) =
+      (0d, 0d, 0d)
+
+    /** Resets the current value of the point accumulator using the provided 
point. */
+    def :=(point: (Double, Double, Double)): Unit = {
+      val (label, feature, weight) = point
+      currentLabel = label * weight
+      currentFeature = feature * weight
+      currentWeight = weight
+    }
+
+    /** Accumulates the provided point into the current value of the point 
accumulator. */
+    def +=(point: (Double, Double, Double)): Unit = {
+      val (label, feature, weight) = point
+      currentLabel += label * weight
+      currentFeature += feature * weight
+      currentWeight += weight
+    }
+
+    /** Appends the current value of the point accumulator to the output. */
+    def appendToOutput(): Unit =
+      output += ((
+        currentLabel / currentWeight,
+        currentFeature / currentWeight,
+        currentWeight))
+
+    /** Returns all accumulated points so far. */
+    def getOutput: Array[(Double, Double, Double)] = output.toArray
+  }
+}
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index 8066900dfa0..b59d16be6cd 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.mllib.regression
 
+import org.apache.commons.math3.util.Precision
 import org.scalatest.matchers.must.Matchers
 
 import org.apache.spark.{SparkException, SparkFunSuite}
@@ -24,6 +25,24 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.util.Utils
 
+/**
+ * Tests can be verified through the following python snippet:
+ *
+ * {{{
+ *   from sklearn.isotonic import IsotonicRegression
+ *
+ *   def test(x, y, x_test, isotonic=True):
+ *       ir = IsotonicRegression(out_of_bounds='clip', 
increasing=isotonic).fit(x, y)
+ *       y_test = ir.predict(x_test)
+ *
+ *       def print_array(label, a):
+ *           print(f"{label}: [{', '.join([str(i) for i in a])}]")
+ *
+ *       print_array("boundaries", ir.X_thresholds_)
+ *       print_array("predictions", ir.y_thresholds_)
+ *       print_array("y_test", y_test)
+ * }}}
+ */
 class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext 
with Matchers {
 
   private def round(d: Double) = {
@@ -44,8 +63,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
       labels: Seq[Double],
       weights: Seq[Double],
       isotonic: Boolean): IsotonicRegressionModel = {
-    val trainRDD = sc.parallelize(generateIsotonicInput(labels, 
weights)).cache()
-    new IsotonicRegression().setIsotonic(isotonic).run(trainRDD)
+    runIsotonicRegressionOnInput(generateIsotonicInput(labels, weights), 
isotonic)
   }
 
   private def runIsotonicRegression(
@@ -54,17 +72,37 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
     runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic)
   }
 
+  private def runIsotonicRegression(
+      labels: Seq[Double],
+      features: Seq[Double],
+      weights: Seq[Double],
+      isotonic: Boolean): IsotonicRegressionModel = {
+    runIsotonicRegressionOnInput(
+      labels.indices.map(i => (labels(i), features(i), weights(i))),
+      isotonic)
+  }
+
+  private def runIsotonicRegressionOnInput(
+      input: Seq[(Double, Double, Double)],
+      isotonic: Boolean,
+      slices: Int = sc.defaultParallelism): IsotonicRegressionModel = {
+    val trainRDD = sc.parallelize(input, slices).cache()
+    new IsotonicRegression().setIsotonic(isotonic).run(trainRDD)
+  }
+
   test("increasing isotonic regression") {
     /*
      The following result could be re-produced with sklearn.
 
-     > from sklearn.isotonic import IsotonicRegression
-     > x = range(9)
-     > y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
-     > ir = IsotonicRegression(x, y)
-     > print ir.predict(x)
+     > test(
+     >   x = range(9),
+     >   y = [1, 2, 3, 1, 6, 17, 16, 17, 18],
+     >   x_test = range(9)
+     > )
 
-     array([  1. ,   2. ,   2. ,   2. ,   6. ,  16.5,  16.5,  17. ,  18. ])
+      boundaries: [0.0, 1.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
+      predictions: [1.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0]
+      y_test: [1.0, 2.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0]
      */
     val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)
 
@@ -142,9 +180,9 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
   }
 
   test("isotonic regression with unordered input") {
-    val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 
5)).reverse, 2).cache()
+    val model =
+      runIsotonicRegressionOnInput(generateIsotonicInput(Seq(1, 2, 3, 4, 
5)).reverse, true, 2)
 
-    val model = new IsotonicRegression().run(trainRDD)
     assert(model.predictions === Array(1, 2, 3, 4, 5))
   }
 
@@ -159,7 +197,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
     val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 
0.1), true)
 
     assert(model.boundaries === Array(0, 1, 2, 4))
-    assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
+    assert(model.predictions.map(round) === Array(1, 2, 3.3 / 1.2, 3.3 / 1.2))
   }
 
   test("weighted isotonic regression with negative weights") {
@@ -176,11 +214,20 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
   }
 
   test("SPARK-16426 isotonic regression with duplicate features that produce 
NaNs") {
-    val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 
1, 1), (0, 2, 1),
-                                                                (1, 2, 1), 
(0.5, 3, 1), (0, 3, 1)),
-                                  2)
+    val model = runIsotonicRegressionOnInput(
+      Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
+      true,
+      2)
+
+    assert(model.boundaries === Array(1.0, 3.0))
+    assert(model.predictions === Array(0.75, 0.75))
+  }
 
-    val model = new IsotonicRegression().run(trainRDD)
+  test("SPARK-41008 isotonic regression with duplicate features differs from 
sklearn") {
+    val model = runIsotonicRegressionOnInput(
+      Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
+      true,
+      2)
 
     assert(model.boundaries === Array(1.0, 3.0))
     assert(model.predictions === Array(0.75, 0.75))
@@ -194,32 +241,38 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
     assert(model.predict(0.5) === 1.5)
     assert(model.predict(0.75) === 1.75)
     assert(model.predict(1) === 2)
-    assert(model.predict(2) === 10d/3)
-    assert(model.predict(9) === 10d/3)
+    assert(model.predict(2) === 10.0 / 3)
+    assert(model.predict(9) === 10.0 / 3)
   }
 
   test("isotonic regression prediction with duplicate features") {
-    val trainRDD = sc.parallelize(
-      Seq[(Double, Double, Double)](
-        (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 
2).cache()
-    val model = new IsotonicRegression().run(trainRDD)
-
-    assert(model.predict(0) === 1)
-    assert(model.predict(1.5) === 2)
-    assert(model.predict(2.5) === 4.5)
-    assert(model.predict(4) === 6)
+    val model = runIsotonicRegressionOnInput(
+      Seq((2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)),
+      true,
+      2)
+
+    assert(model.boundaries === Array(1.0, 2.0, 3.0))
+    assert(model.predictions === Array(1.5, 3.0, 5.5))
+
+    assert(model.predict(0) === 1.5)
+    assert(model.predict(1.5) === 2.25)
+    assert(model.predict(2.5) === 4.25)
+    assert(model.predict(4) === 5.5)
   }
 
   test("antitonic regression prediction with duplicate features") {
-    val trainRDD = sc.parallelize(
-      Seq[(Double, Double, Double)](
-        (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 
2).cache()
-    val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)
-
-    assert(model.predict(0) === 6)
-    assert(model.predict(1.5) === 4.5)
-    assert(model.predict(2.5) === 2)
-    assert(model.predict(4) === 1)
+    val model = runIsotonicRegressionOnInput(
+      Seq((5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)),
+      false,
+      2)
+
+    assert(model.boundaries === Array(1.0, 2.0, 3.0))
+    assert(model.predictions === Array(5.5, 3.0, 1.5))
+
+    assert(model.predict(0) === 5.5)
+    assert(model.predict(1.5) === 4.25)
+    assert(model.predict(2.5) === 2.25)
+    assert(model.predict(4) === 1.5)
   }
 
   test("isotonic regression RDD prediction") {
@@ -227,7 +280,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
 
     val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 
2).cache()
     val predictions = testRDD.map(x => (x, 
model.predict(x))).collect().sortBy(_._1).map(_._2)
-    assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
+    assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0 / 3, 10.0 / 3))
   }
 
   test("antitonic regression prediction") {
@@ -270,4 +323,59 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext w
       new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = 
false)
     }
   }
+
+  test("makeUnique: handle duplicate features") {
+    val regressor = new IsotonicRegression()
+    import regressor.makeUnique
+    import Precision.EPSILON
+
+    // Note: input must be lexicographically sorted by (feature, label)
+
+    // empty
+    assert(makeUnique(Array.empty) === Array.empty)
+
+    // single
+    assert(makeUnique(Array((1.0, 1.0, 1.0))) === Array((1.0, 1.0, 1.0)))
+
+    // two and duplicate
+    assert(makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0, 1.0))) === Array((1.0, 
1.0, 2.0)))
+
+    // two and unique
+    assert(
+      makeUnique(Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0))) ===
+        Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0)))
+
+    // generic with duplicates
+    assert(
+      makeUnique(
+        Array(
+          (10.0, 1.0, 1.0), (20.0, 1.0, 1.0),
+          (10.0, 2.0, 1.0), (20.0, 2.0, 1.0), (30.0, 2.0, 1.0),
+          (10.0, 3.0, 1.0)
+        )) === Array((15.0, 1.0, 2.0), (20.0, 2.0, 3.0), (10.0, 3.0, 1.0)))
+
+    // generic unique
+    assert(
+      makeUnique(Array((10.0, 1.0, 1.0), (10.0, 2.0, 1.0), (10.0, 3.0, 1.0))) 
=== Array(
+        (10.0, 1.0, 1.0),
+        (10.0, 2.0, 1.0),
+        (10.0, 3.0, 1.0)))
+
+    // generic with duplicates and non-uniform weights
+    assert(
+      makeUnique(
+        Array(
+          (10.0, 1.0, 0.3), (20.0, 1.0, 0.7),
+          (10.0, 2.0, 0.3), (20.0, 2.0, 0.3), (30.0, 2.0, 0.4),
+          (10.0, 3.0, 1.0)
+        )) === Array(
+        (10.0 * 0.3 + 20.0 * 0.7, 1.0, 1.0),
+        (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0),
+        (10.0, 3.0, 1.0)))
+
+    // duplicate up to resolution error
+    assert(
+      makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + 
EPSILON, 1.0))) ===
+        Array((1.0, 1.0, 3.0)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to