Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15018#discussion_r95914526
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala 
---
    @@ -312,90 +313,120 @@ class IsotonicRegression private (private var 
isotonic: Boolean) extends Seriali
       }
     
       /**
    -   * Performs a pool adjacent violators algorithm (PAV).
    -   * Uses approach with single processing of data where violators
    -   * in previously processed data created by pooling are fixed immediately.
    -   * Uses optimization of discovering monotonicity violating sequences 
(blocks).
    +   * Performs a pool adjacent violators algorithm (PAV). Implements the 
algorithm originally
    +   * described in [1], using the formulation from [2, 3]. Uses an array to 
keep track of start
    +   * and end indices of blocks.
        *
    -   * @param input Input data of tuples (label, feature, weight).
    +   * [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order 
simplexes." Applied
    +   * mathematics and Optimization 12.1 (1984): 247-270.
    +   *
    +   * [2] Best, Michael J., and Nilotpal Chakravarti. "Active set 
algorithms for isotonic
    +   * regression; a unifying framework." Mathematical Programming 47.1-3 
(1990): 425-439.
    +   *
    +   * [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. 
"Minimizing separable convex
    +   * functions subject to simple chain constraints." SIAM Journal on 
Optimization 10.3 (2000):
    +   * 658-672.
    +   *
    +   * @param input Input data of tuples (label, feature, weight). Weights 
must
    +                  be non-negative.
        * @return Result tuples (label, feature, weight) where labels were 
updated
        *         to form a monotone sequence as per isotonic regression 
definition.
        */
       private def poolAdjacentViolators(
           input: Array[(Double, Double, Double)]): Array[(Double, Double, 
Double)] = {
     
    -    if (input.isEmpty) {
    -      return Array.empty
    +    val cleanInput = input.flatMap{ case (y, x, weight) =>
    +      require(weight >= 0.0, "weights must be non-negative")
    +      if (weight == 0.0) {
    +        logWarning(s"Dropping zero-weight point ($y, $x, $weight)")
    +        Array[(Double, Double, Double)]()
    +      } else {
    +        Array((y, x, weight))
    +      }
         }
     
    -    // Pools sub array within given bounds assigning weighted average 
value to all elements.
    -    def pool(input: Array[(Double, Double, Double)], start: Int, end: 
Int): Unit = {
    -      val poolSubArray = input.slice(start, end + 1)
    +    if (cleanInput.isEmpty) {
    +      return Array.empty
    +    }
     
    -      val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
    -      val weight = poolSubArray.map(_._3).sum
    +    // Keeps track of the start and end indices of the blocks. if [i, j] 
is a valid block from
    +    // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j 
and blockBounds(j) = i
    +    // Initially, each data point is its own block.
    +    val blockBounds = Array.range(0, cleanInput.length)
     
    -      var i = start
    -      while (i <= end) {
    -        input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
    -        i = i + 1
    -      }
    +    // Keep track of the sum of weights and sum of weight * y for each 
block. weights(start)
    +    // gives the values for the block. Entries that are not at the start 
of a block
    +    // are meaningless.
    +    val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, 
weight) =>
    +      (weight, weight * y)
         }
     
    -    var i = 0
    -    val len = input.length
    -    while (i < len) {
    -      var j = i
    +    // a few convenience functions to make the code more readable
     
    -      // Find monotonicity violating sequence, if any.
    -      while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
    -        j = j + 1
    -      }
    +    // blockStart and blockEnd have identical implementations. We create 
two different
    +    // functions to make the code more expressive
    +    def blockEnd(start: Int): Int = blockBounds(start)
    +    def blockStart(end: Int): Int = blockBounds(end)
     
    -      // If monotonicity was not violated, move to next data point.
    -      if (i == j) {
    -        i = i + 1
    -      } else {
    -        // Otherwise pool the violating sequence
    -        // and check if pooling caused monotonicity violation in 
previously processed points.
    -        while (i >= 0 && input(i)._1 > input(i + 1)._1) {
    -          pool(input, i, j)
    -          i = i - 1
    -        }
    +    // the next block starts at the index after the end of this block
    +    def nextBlock(start: Int): Int = blockEnd(start) + 1
     
    -        i = j
    -      }
    +    // the previous block ends at the index before the start of this block
    +    // we then use blockStart to find the start
    +    def prevBlock(start: Int): Int = blockStart(start - 1)
    +
    +    // Merge two adjacent blocks, updating blockBounds and weights to 
reflect the merge
    +    // Return the start index of the merged block
    +    def merge(block1: Int, block2: Int): Int = {
    +      assert(
    +        blockEnd(block1) + 1 == block2,
    +        s"Attempting to merge non-consecutive blocks [${block1}, 
${blockEnd(block1)}]" +
    +        s" and [${block2}, ${blockEnd(block2)}. This is likely a bug in 
the isotonic regression" +
    --- End diff --
    
    missing "]"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to