Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15018#discussion_r95914526 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala --- @@ -312,90 +313,120 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } /** - * Performs a pool adjacent violators algorithm (PAV). - * Uses approach with single processing of data where violators - * in previously processed data created by pooling are fixed immediately. - * Uses optimization of discovering monotonicity violating sequences (blocks). + * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally + * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start + * and end indices of blocks. * - * @param input Input data of tuples (label, feature, weight). + * [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied + * mathematics and Optimization 12.1 (1984): 247-270. + * + * [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic + * regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439. + * + * [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex + * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000): + * 658-672. + * + * @param input Input data of tuples (label, feature, weight). Weights must + be non-negative. * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - if (input.isEmpty) { - return Array.empty + val cleanInput = input.flatMap{ case (y, x, weight) => + require(weight >= 0.0, "weights must be non-negative") + if (weight == 0.0) { + logWarning(s"Dropping zero-weight point ($y, $x, $weight)") + Array[(Double, Double, Double)]() + } else { + Array((y, x, weight)) + } } - // Pools sub array within given bounds assigning weighted average value to all elements. - def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.slice(start, end + 1) + if (cleanInput.isEmpty) { + return Array.empty + } - val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum - val weight = poolSubArray.map(_._3).sum + // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from + // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i + // Initially, each data point is its own block. + val blockBounds = Array.range(0, cleanInput.length) - var i = start - while (i <= end) { - input(i) = (weightedSum / weight, input(i)._2, input(i)._3) - i = i + 1 - } + // Keep track of the sum of weights and sum of weight * y for each block. weights(start) + // gives the values for the block. Entries that are not at the start of a block + // are meaningless. + val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) => + (weight, weight * y) } - var i = 0 - val len = input.length - while (i < len) { - var j = i + // a few convenience functions to make the code more readable - // Find monotonicity violating sequence, if any. - while (j < len - 1 && input(j)._1 > input(j + 1)._1) { - j = j + 1 - } + // blockStart and blockEnd have identical implementations. We create two different + // functions to make the code more expressive + def blockEnd(start: Int): Int = blockBounds(start) + def blockStart(end: Int): Int = blockBounds(end) - // If monotonicity was not violated, move to next data point. - if (i == j) { - i = i + 1 - } else { - // Otherwise pool the violating sequence - // and check if pooling caused monotonicity violation in previously processed points. - while (i >= 0 && input(i)._1 > input(i + 1)._1) { - pool(input, i, j) - i = i - 1 - } + // the next block starts at the index after the end of this block + def nextBlock(start: Int): Int = blockEnd(start) + 1 - i = j - } + // the previous block ends at the index before the start of this block + // we then use blockStart to find the start + def prevBlock(start: Int): Int = blockStart(start - 1) + + // Merge two adjacent blocks, updating blockBounds and weights to reflect the merge + // Return the start index of the merged block + def merge(block1: Int, block2: Int): Int = { + assert( + blockEnd(block1) + 1 == block2, + s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" + + s" and [${block2}, ${blockEnd(block2)}. This is likely a bug in the isotonic regression" + --- End diff -- missing "]"
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org