[ https://issues.apache.org/jira/browse/SPARK-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366975#comment-15366975 ]
DB Tsai commented on SPARK-16404: --------------------------------- I think something like the following will work. Note that `lazy` is important here otherwise will result `NullPointerException` since the serialization will happen in driver when the object is constructed. {code} private class LeastSquaresAggregator( coefficientsB: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, featuresStdB: Broadcast[Array[Double]], featuresMeanB: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 @transient private lazy val (effectiveCoefficientsVector: Vector, featuresStd: Array[Double], offset: Double, dim: Int) = { val coefficientsArray = coefficientsB.value.toArray.clone() val featuresMean = featuresMeanB.value val localFeatureStd = featuresStdB.value var sum = 0.0 var i = 0 val len = coefficientsArray.length while (i < len) { if (localFeatureStd(i) != 0.0) { coefficientsArray(i) /= localFeatureStd(i) sum += coefficientsArray(i) * featuresMean(i) } else { coefficientsArray(i) = 0.0 } i += 1 } val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0 (Vectors.dense(coefficientsArray), localFeatureStd, offset, coefficientsArray.length) } private var gradientSumArray: Array[Double] = _ /** * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient * of the objective function. * * @param instance The instance of data point to be added. * @return This LeastSquaresAggregator object. */ def add(instance: Instance): this.type = { if (gradientSumArray == null) gradientSumArray = Array.ofDim[Double](dim) instance match { case Instance(label, weight, features) => require(dim == features.size, s"Dimensions mismatch when adding new sample." + s" Expecting $dim but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this val diff = dot(features, effectiveCoefficientsVector) - label / labelStd + offset if (diff != 0) { val localGradientSumArray = gradientSumArray val localFeaturesStd = featuresStd features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index) } } lossSum += weight * diff * diff / 2.0 } totalCnt += 1 weightSum += weight this } } {code} > LeastSquaresAggregator in Linear Regression serializes unnecessary data > ----------------------------------------------------------------------- > > Key: SPARK-16404 > URL: https://issues.apache.org/jira/browse/SPARK-16404 > Project: Spark > Issue Type: Improvement > Components: ML > Reporter: Seth Hendrickson > > This is basically the same issue as > [SPARK-16008|https://issues.apache.org/jira/browse/SPARK-16008], but for > linear regression, where {{coefficients}} and {{featuresStd}} are > unnecessarily serialized between stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org