Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15018#discussion_r92920879 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -328,74 +336,68 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali return Array.empty } - // Pools sub array within given bounds assigning weighted average value to all elements. - def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.slice(start, end + 1) - - val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum - val weight = poolSubArray.map(_._3).sum - - var i = start - while (i <= end) { - input(i) = (weightedSum / weight, input(i)._2, input(i)._3) - i = i + 1 - } + /* + Keeps track of the start and end indices of the blocks. blockBounds(start) gives the + index of the end of the block and blockBounds(end) gives the index of the start of the + block. Entries that are not the start or end of the block are meaningless. + */ + val blockBounds = Array.range(0, input.length) // Initially, each data point is its own block + + /* + Keep track of the sum of weights and sum of weight * y for each block. weights(start) + gives the values for the block. Entries that are not at the start of a block + are meaningless. + */ + val weights: Array[(Double, Double)] = input.map(x => (x._3, x._3 * x._1)) // (weight, weight * y) + + // a few convenience functions to make the code more readable + def blockEnd(start: Int) = blockBounds(start) + def blockStart(end: Int) = blockBounds(end) + def nextBlock(start: Int) = blockEnd(start) + 1 + def prevBlock(start: Int) = blockStart(start - 1) + def merge(block1: Int, block2: Int): Int = { + assert(blockEnd(block1) + 1 == block2, "attempting to merge non-consecutive blocks") + blockBounds(block1) = blockEnd(block2) + blockBounds(blockEnd(block2)) = block1 + val w1 = weights(block1) + val w2 = weights(block2) + weights(block1) = (w1._1 + w2._1, w1._2 + w2._2) + block1 } + def average(start: Int) = weights(start)._2 / weights(start)._1 + /* + Implement Algorithm PAV from [3]. + Merge on >= instead of > because it elimnate adjacent blocks with the same average, and we want + to compress our output as much as possible. Both give correct results. + */ var i = 0 - val len = input.length - while (i < len) { - var j = i - - // Find monotonicity violating sequence, if any. - while (j < len - 1 && input(j)._1 > input(j + 1)._1) { - j = j + 1 - } - - // If monotonicity was not violated, move to next data point. - if (i == j) { - i = i + 1 - } else { - // Otherwise pool the violating sequence - // and check if pooling caused monotonicity violation in previously processed points. - while (i >= 0 && input(i)._1 > input(i + 1)._1) { - pool(input, i, j) - i = i - 1 + while (nextBlock(i) < input.length) { + if (average(i) >= average(nextBlock(i))) { + merge(i, nextBlock(i)) + while((i > 0) && (average(prevBlock(i)) >= average(i))) { --- End diff -- Super nit: space before `while` here and below. You can probably nix the extra parens around each of the two terms, but no big deal
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org