[ 
https://issues.apache.org/jira/browse/SPARK-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366975#comment-15366975
 ] 

DB Tsai commented on SPARK-16404:
---------------------------------

I think something like the following will work. Note that `lazy` is important 
here otherwise will result `NullPointerException` since the serialization will 
happen in driver when the object is constructed. 

{code}
private class LeastSquaresAggregator(
    coefficientsB: Broadcast[Vector],
    labelStd: Double,
    labelMean: Double,
    fitIntercept: Boolean,
    featuresStdB: Broadcast[Array[Double]],
    featuresMeanB: Broadcast[Array[Double]]) extends Serializable {

  private var totalCnt: Long = 0L
  private var weightSum: Double = 0.0
  private var lossSum = 0.0

  @transient private lazy val (effectiveCoefficientsVector: Vector, 
featuresStd: Array[Double],
    offset: Double, dim: Int) = {
    val coefficientsArray = coefficientsB.value.toArray.clone()
    val featuresMean = featuresMeanB.value
    val localFeatureStd = featuresStdB.value
    var sum = 0.0
    var i = 0
    val len = coefficientsArray.length
    while (i < len) {
      if (localFeatureStd(i) != 0.0) {
        coefficientsArray(i) /=  localFeatureStd(i)
        sum += coefficientsArray(i) * featuresMean(i)
      } else {
        coefficientsArray(i) = 0.0
      }
      i += 1
    }
    val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
    (Vectors.dense(coefficientsArray), localFeatureStd, offset, 
coefficientsArray.length)
  }

  private var gradientSumArray: Array[Double] = _

  /**
   * Add a new training instance to this LeastSquaresAggregator, and update the 
loss and gradient
   * of the objective function.
   *
   * @param instance The instance of data point to be added.
   * @return This LeastSquaresAggregator object.
   */
  def add(instance: Instance): this.type = {
    if (gradientSumArray == null) gradientSumArray = Array.ofDim[Double](dim)

    instance match { case Instance(label, weight, features) =>
      require(dim == features.size, s"Dimensions mismatch when adding new 
sample." +
        s" Expecting $dim but got ${features.size}.")
      require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

      if (weight == 0.0) return this

      val diff = dot(features, effectiveCoefficientsVector) - label / labelStd 
+ offset

      if (diff != 0) {
        val localGradientSumArray = gradientSumArray
        val localFeaturesStd = featuresStd
        features.foreachActive { (index, value) =>
          if (localFeaturesStd(index) != 0.0 && value != 0.0) {
            localGradientSumArray(index) += weight * diff * value / 
localFeaturesStd(index)
          }
        }
        lossSum += weight * diff * diff / 2.0
      }

      totalCnt += 1
      weightSum += weight
      this
    }
  }
{code}


> LeastSquaresAggregator in Linear Regression serializes unnecessary data
> -----------------------------------------------------------------------
>
>                 Key: SPARK-16404
>                 URL: https://issues.apache.org/jira/browse/SPARK-16404
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>            Reporter: Seth Hendrickson
>
> This is basically the same issue as 
> [SPARK-16008|https://issues.apache.org/jira/browse/SPARK-16008], but for 
> linear regression, where {{coefficients}} and {{featuresStd}} are 
> unnecessarily serialized between stages. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to