This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 701deac [SPARK-31603][ML] AFT uses common functions in RDDLossFunction 701deac is described below commit 701deac88d09690ddf9d28b9c79814aecfd3277d Author: zhengruifeng <ruife...@foxmail.com> AuthorDate: Tue May 5 08:35:20 2020 -0500 [SPARK-31603][ML] AFT uses common functions in RDDLossFunction ### What changes were proposed in this pull request? 1, make AFT reuse common functions in `ml.optim`, rather than making its own impl. ### Why are the changes needed? The logic in optimizing AFT is quite similar to other algorithms like other algs based on `RDDLossFunction`, We should reuse the common functions. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing testsuites Closes #28404 from zhengruifeng/mv_aft_optim. Authored-by: zhengruifeng <ruife...@foxmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/ml/optim/aggregator/AFTAggregator.scala | 162 +++++++++++++++ .../aggregator/DifferentiableLossAggregator.scala | 9 +- .../ml/regression/AFTSurvivalRegression.scala | 228 +-------------------- 3 files changed, 173 insertions(+), 226 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala new file mode 100644 index 0000000..6482c61 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.regression.AFTPoint + +/** + * AFTAggregator computes the gradient and loss for a AFT loss function, + * as used in AFT survival regression for samples in sparse or dense vector in an online fashion. + * + * The loss function and likelihood function under the AFT model based on: + * Lawless, J. F., Statistical Models and Methods for Lifetime Data, + * New York: John Wiley & Sons, Inc. 2003. + * + * Two AFTAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n, + * with possible right-censoring, the likelihood function under the AFT model is given as + * + * <blockquote> + * $$ + * L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0} + * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0} + * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} + * $$ + * </blockquote> + * + * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. + * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function + * assumes the form + * + * <blockquote> + * $$ + * \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+ + * \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] + * $$ + * </blockquote> + * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, + * and $f_{0}(\epsilon_{i})$ is corresponding density function. + * + * The most commonly used log-linear survival regression method is based on the Weibull + * distribution of the survival time. The Weibull distribution for lifetime corresponding + * to extreme value distribution for log of the lifetime, + * and the $S_{0}(\epsilon)$ function is + * + * <blockquote> + * $$ + * S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) + * $$ + * </blockquote> + * + * and the $f_{0}(\epsilon_{i})$ function is + * + * <blockquote> + * $$ + * f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) + * $$ + * </blockquote> + * + * The log-likelihood function for Weibull distribution of lifetime is + * + * <blockquote> + * $$ + * \iota(\beta,\sigma)= + * -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] + * $$ + * </blockquote> + * + * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, + * the loss function we use to optimize is $-\iota(\beta,\sigma)$. + * The gradient functions for $\beta$ and $\log\sigma$ respectively are + * + * <blockquote> + * $$ + * \frac{\partial (-\iota)}{\partial \beta}= + * \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\ + * + * \frac{\partial (-\iota)}{\partial (\log\sigma)}= + * \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] + * $$ + * </blockquote> + * + * @param bcCoefficients The broadcasted value includes three part: 1, regression coefficients + * corresponding to the features; 2, the intercept; 3, the log of scale + * parameter. + * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + */ + +private[ml] class AFTAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] { + + protected override val dim: Int = bcCoefficients.value.size + + /** + * Add a new training data to this AFTAggregator, and update the loss and gradient + * of the objective function. + * + * @param data The AFTPoint representation for one data point to be added into this aggregator. + * @return This AFTAggregator object. + */ + def add(data: AFTPoint): this.type = { + val coefficients = bcCoefficients.value.toArray + val intercept = coefficients(dim - 2) + // sigma is the scale parameter of the AFT model + val sigma = math.exp(coefficients(dim - 1)) + + val xi = data.features + val ti = data.label + val delta = data.censor + + require(ti > 0.0, "The lifetime or label should be greater than 0.") + + val localFeaturesStd = bcFeaturesStd.value + + val margin = { + var sum = 0.0 + xi.foreachNonZero { (index, value) => + if (localFeaturesStd(index) != 0.0) { + sum += coefficients(index) * (value / localFeaturesStd(index)) + } + } + sum + intercept + } + val epsilon = (math.log(ti) - margin) / sigma + + lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon) + + val multiplier = (delta - math.exp(epsilon)) / sigma + + xi.foreachNonZero { (index, value) => + if (localFeaturesStd(index) != 0.0) { + gradientSumArray(index) += multiplier * (value / localFeaturesStd(index)) + } + } + gradientSumArray(dim - 2) += { if (fitIntercept) multiplier else 0.0 } + gradientSumArray(dim - 1) += delta + multiplier * sigma * epsilon + + weightSum += 1.0 + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala index 403c28f..b247723 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala @@ -53,14 +53,7 @@ private[ml] trait DifferentiableLossAggregator[ if (other.weightSum != 0) { weightSum += other.weightSum lossSum += other.lossSum - - var i = 0 - val localThisGradientSumArray = this.gradientSumArray - val localOtherGradientSumArray = other.gradientSumArray - while (i < dim) { - localThisGradientSumArray(i) += localOtherGradientSumArray(i) - i += 1 - } + BLAS.getBLAS(dim).daxpy(dim, 1.0, other.gradientSumArray, 1, this.gradientSumArray, 1) } this } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 2da65a0..8cc5f86 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -20,15 +20,16 @@ package org.apache.spark.ml.regression import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} -import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS} +import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS} import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.optim.aggregator.AFTAggregator +import org.apache.spark.ml.optim.loss.RDDLossFunction import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.stat._ @@ -208,7 +209,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S ) val featuresStd = featuresSummarizer.std.toArray - val numFeatures = featuresStd.size + val numFeatures = featuresStd.length instr.logPipelineStage(this) instr.logDataset(dataset) @@ -226,9 +227,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S } val bcFeaturesStd = instances.context.broadcast(featuresStd) - - val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd, $(aggregationDepth)) + val getAggregatorFunc = new AFTAggregator(bcFeaturesStd, $(fitIntercept))(_) val optimizer = new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + val costFun = new RDDLossFunction(instances, getAggregatorFunc, None, $(aggregationDepth)) /* The parameters vector has three parts: @@ -258,15 +259,15 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S bcFeaturesStd.destroy() if (handlePersistence) instances.unpersist() - val rawCoefficients = parameters.slice(2, parameters.length) + val rawCoefficients = parameters.take(numFeatures) var i = 0 while (i < numFeatures) { rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 } i += 1 } val coefficients = Vectors.dense(rawCoefficients) - val intercept = parameters(1) - val scale = math.exp(parameters(0)) + val intercept = parameters(numFeatures) + val scale = math.exp(parameters(numFeatures + 1)) new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale) } @@ -434,215 +435,6 @@ object AFTSurvivalRegressionModel extends MLReadable[AFTSurvivalRegressionModel] } /** - * AFTAggregator computes the gradient and loss for a AFT loss function, - * as used in AFT survival regression for samples in sparse or dense vector in an online fashion. - * - * The loss function and likelihood function under the AFT model based on: - * Lawless, J. F., Statistical Models and Methods for Lifetime Data, - * New York: John Wiley & Sons, Inc. 2003. - * - * Two AFTAggregator can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n, - * with possible right-censoring, the likelihood function under the AFT model is given as - * - * <blockquote> - * $$ - * L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0} - * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0} - * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} - * $$ - * </blockquote> - * - * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. - * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function - * assumes the form - * - * <blockquote> - * $$ - * \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+ - * \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] - * $$ - * </blockquote> - * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, - * and $f_{0}(\epsilon_{i})$ is corresponding density function. - * - * The most commonly used log-linear survival regression method is based on the Weibull - * distribution of the survival time. The Weibull distribution for lifetime corresponding - * to extreme value distribution for log of the lifetime, - * and the $S_{0}(\epsilon)$ function is - * - * <blockquote> - * $$ - * S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) - * $$ - * </blockquote> - * - * and the $f_{0}(\epsilon_{i})$ function is - * - * <blockquote> - * $$ - * f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) - * $$ - * </blockquote> - * - * The log-likelihood function for Weibull distribution of lifetime is - * - * <blockquote> - * $$ - * \iota(\beta,\sigma)= - * -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] - * $$ - * </blockquote> - * - * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, - * the loss function we use to optimize is $-\iota(\beta,\sigma)$. - * The gradient functions for $\beta$ and $\log\sigma$ respectively are - * - * <blockquote> - * $$ - * \frac{\partial (-\iota)}{\partial \beta}= - * \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\ - * - * \frac{\partial (-\iota)}{\partial (\log\sigma)}= - * \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] - * $$ - * </blockquote> - * - * @param bcParameters The broadcasted value includes three part: The log of scale parameter, - * the intercept and regression coefficients corresponding to the features. - * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The broadcast standard deviation values of the features. - */ -private class AFTAggregator( - bcParameters: Broadcast[BDV[Double]], - fitIntercept: Boolean, - bcFeaturesStd: Broadcast[Array[Double]]) extends Serializable { - - private val length = bcParameters.value.length - // make transient so we do not serialize between aggregation stages - @transient private lazy val parameters = bcParameters.value - // the regression coefficients to the covariates - @transient private lazy val coefficients = parameters.slice(2, length) - @transient private lazy val intercept = parameters(1) - // sigma is the scale parameter of the AFT model - @transient private lazy val sigma = math.exp(parameters(0)) - - private var totalCnt: Long = 0L - private var lossSum = 0.0 - // Here we optimize loss function over log(sigma), intercept and coefficients - private lazy val gradientSumArray = Array.ofDim[Double](length) - - def count: Long = totalCnt - def loss: Double = { - require(totalCnt > 0.0, s"The number of instances should be " + - s"greater than 0.0, but got $totalCnt.") - lossSum / totalCnt - } - def gradient: BDV[Double] = { - require(totalCnt > 0.0, s"The number of instances should be " + - s"greater than 0.0, but got $totalCnt.") - new BDV(gradientSumArray.map(_ / totalCnt.toDouble)) - } - - - /** - * Add a new training data to this AFTAggregator, and update the loss and gradient - * of the objective function. - * - * @param data The AFTPoint representation for one data point to be added into this aggregator. - * @return This AFTAggregator object. - */ - def add(data: AFTPoint): this.type = { - val xi = data.features - val ti = data.label - val delta = data.censor - - require(ti > 0.0, "The lifetime or label should be greater than 0.") - - val localFeaturesStd = bcFeaturesStd.value - - val margin = { - var sum = 0.0 - xi.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += coefficients(index) * (value / localFeaturesStd(index)) - } - } - sum + intercept - } - val epsilon = (math.log(ti) - margin) / sigma - - lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon) - - val multiplier = (delta - math.exp(epsilon)) / sigma - - gradientSumArray(0) += delta + multiplier * sigma * epsilon - gradientSumArray(1) += { if (fitIntercept) multiplier else 0.0 } - xi.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - gradientSumArray(index + 2) += multiplier * (value / localFeaturesStd(index)) - } - } - - totalCnt += 1 - this - } - - /** - * Merge another AFTAggregator, and update the loss and gradient - * of the objective function. - * (Note that it's in place merging; as a result, `this` object will be modified.) - * - * @param other The other AFTAggregator to be merged. - * @return This AFTAggregator object. - */ - def merge(other: AFTAggregator): this.type = { - if (other.count != 0) { - totalCnt += other.totalCnt - lossSum += other.lossSum - - var i = 0 - while (i < length) { - this.gradientSumArray(i) += other.gradientSumArray(i) - i += 1 - } - } - this - } -} - -/** - * AFTCostFun implements Breeze's DiffFunction[T] for AFT cost. - * It returns the loss and gradient at a particular point (parameters). - * It's used in Breeze's convex optimization routines. - */ -private class AFTCostFun( - data: RDD[AFTPoint], - fitIntercept: Boolean, - bcFeaturesStd: Broadcast[Array[Double]], - aggregationDepth: Int) extends DiffFunction[BDV[Double]] { - - override def calculate(parameters: BDV[Double]): (Double, BDV[Double]) = { - - val bcParameters = data.context.broadcast(parameters) - - val aftAggregator = data.treeAggregate( - new AFTAggregator(bcParameters, fitIntercept, bcFeaturesStd))( - seqOp = (c, v) => (c, v) match { - case (aggregator, instance) => aggregator.add(instance) - }, - combOp = (c1, c2) => (c1, c2) match { - case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) - }, depth = aggregationDepth) - - bcParameters.destroy() - (aftAggregator.loss, aftAggregator.gradient) - } -} - -/** * Class that represents the (features, label, censor) of a data point. * * @param features List of features for this data point. @@ -650,6 +442,6 @@ private class AFTCostFun( * @param censor Indicator of the event has occurred or not. If the value is 1, it means * the event has occurred i.e. uncensored; otherwise censored. */ -private[regression] case class AFTPoint(features: Vector, label: Double, censor: Double) { +private[ml] case class AFTPoint(features: Vector, label: Double, censor: Double) { require(censor == 1.0 || censor == 0.0, "censor of class AFTPoint must be 1.0 or 0.0") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org