Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8013#discussion_r37813263
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
    @@ -645,3 +666,179 @@ private class LeastSquaresCostFun(
         (leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray))
       }
     }
    +
    +/**
    + * HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used 
in Robust regression.
    + * The Huber M-estimator corresponds to a probability distribution for the 
errors which is normal
    + * in the centre but like a double exponential distribution in the tails 
(Hogg 1979: 109).
    + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
    + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
    + * where k = 1.345 which produce 95% efficiency when the errors are normal 
and
    + * substantial resistance to outliers otherwise.
    + * See also the documentation for the precise formulation.
    + * It's used in Breeze's convex optimization routines.
    + */
    +
    +private class HuberAggregator(
    +    weights: Vector,
    +    labelStd: Double,
    +    labelMean: Double,
    +    fitIntercept: Boolean,
    +    featuresStd: Array[Double],
    +    featuresMean: Array[Double],
    +    robustEfficiency: Double) extends Serializable {
    +
    +  private var totalCnt: Long = 0L
    +  private var lossSum = 0.0
    +
    +  private val (effectiveWeightsArray: Array[Double], offset: Double, dim: 
Int) = {
    +    val weightsArray = weights.toArray.clone()
    +    var sum = 0.0
    +    var i = 0
    +    val len = weightsArray.length
    +    while (i < len) {
    +      if (featuresStd(i) != 0.0) {
    +        weightsArray(i) /=  featuresStd(i)
    +        sum += weightsArray(i) * featuresMean(i)
    +      } else {
    +        weightsArray(i) = 0.0
    +      }
    +      i += 1
    +    }
    +    (weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, 
weightsArray.length)
    +  }
    +
    +  private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)
    +
    +  private val gradientSumArray = Array.ofDim[Double](dim)
    +
    +  /**
    +   * Add a new training data to this HuberAggregator, and update the loss 
and gradient
    +   * of the objective function.
    +   *
    +   * @param label The label for this data point.
    +   * @param data The features for one data point in dense/sparse vector 
format to be added
    +   *             into this aggregator.
    +   * @return This HuberAggregator object.
    +   */
    +  def add(label: Double, data: Vector): this.type = {
    +    require(dim == data.size, s"Dimensions mismatch when adding new 
sample." +
    +      s" Expecting $dim but got ${data.size}.")
    +
    +    val diff = dot(data, effectiveWeightsVector) - label / labelStd + 
offset
    +    val k = robustEfficiency
    +
    +    if (diff < -k) {
    +      lossSum += (-k * diff - 0.5 * k * k) / 2.0
    +    } else if (diff > k) {
    +      lossSum += (k * diff - 0.5 * k * k) / 2.0
    +    } else if (diff != 0) {
    +      val localGradientSumArray = gradientSumArray
    +      data.foreachActive { (index, value) =>
    +        if (featuresStd(index) != 0.0 && value != 0.0) {
    +          localGradientSumArray(index) += diff * value / featuresStd(index)
    +        }
    +      }
    +      lossSum += diff * diff / 4.0
    +    }
    +
    +    totalCnt += 1
    +    this
    +  }
    +
    +  /**
    +   * Merge another HuberAggregator, and update the loss and gradient
    +   * of the objective function.
    +   * (Note that it's in place merging; as a result, `this` object will be 
modified.)
    +   *
    +   * @param other The other HuberAggregator to be merged.
    +   * @return This HuberAggregator object.
    +   */
    +  def merge(other: HuberAggregator): this.type = {
    +    require(dim == other.dim, s"Dimensions mismatch when merging with 
another " +
    +      s"HuberAggregator. Expecting $dim but got ${other.dim}.")
    +
    +    if (other.totalCnt != 0) {
    +      totalCnt += other.totalCnt
    +      lossSum += other.lossSum
    +
    +      var i = 0
    +      val localThisGradientSumArray = this.gradientSumArray
    +      val localOtherGradientSumArray = other.gradientSumArray
    +      while (i < dim) {
    +        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
    +        i += 1
    +      }
    +    }
    +    this
    +  }
    +
    +  def count: Long = totalCnt
    +
    +  def loss: Double = lossSum / totalCnt
    +
    +  def gradient: Vector = {
    +    val result = Vectors.dense(gradientSumArray.clone())
    +    scal(1.0 / totalCnt, result)
    +    result
    +  }
    +}
    +
    +private class HuberCostFun(
    +    data: RDD[(Double, Vector)],
    +    labelStd: Double,
    +    labelMean: Double,
    +    fitIntercept: Boolean,
    +    standardization: Boolean,
    +    featuresStd: Array[Double],
    +    featuresMean: Array[Double],
    +    effectiveL2regParam: Double,
    +    robustEfficiency: Double) extends DiffFunction[BDV[Double]] {
    +
    --- End diff --
    
    Still have lots of duplicated code. Do you think you can pass a parameter 
to switch different loss?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to