[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14109 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73765222 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -996,19 +1013,24 @@ private class LeastSquaresCostFun( featuresMean: Array[Double], effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] { + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val bcFeaturesMean = instances.context.broadcast(featuresMean) + override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) +val bcCoeffs = instances.context.broadcast(coeffs) val leastSquaresAggregator = { val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance) val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2) instances.treeAggregate( -new LeastSquaresAggregator(coeffs, labelStd, labelMean, fitIntercept, featuresStd, - featuresMean))(seqOp, combOp) +new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd, + bcFeaturesMean))(seqOp, combOp) } val totalGradientArray = leastSquaresAggregator.gradient.toArray +bcCoeffs.destroy(blocking = false) --- End diff -- We cannot destroy them here because they are used on every iteration. I just added a commit to fix this so that, after the algorithm is run, we destroy the broadcast variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73754722 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -996,19 +1013,24 @@ private class LeastSquaresCostFun( featuresMean: Array[Double], effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] { + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val bcFeaturesMean = instances.context.broadcast(featuresMean) + override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) +val bcCoeffs = instances.context.broadcast(coeffs) val leastSquaresAggregator = { val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance) val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2) instances.treeAggregate( -new LeastSquaresAggregator(coeffs, labelStd, labelMean, fitIntercept, featuresStd, - featuresMean))(seqOp, combOp) +new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd, + bcFeaturesMean))(seqOp, combOp) } val totalGradientArray = leastSquaresAggregator.gradient.toArray +bcCoeffs.destroy(blocking = false) --- End diff -- BTW, why do we not explicitly destroy `bcFeaturesStd` and `bcFeaturesMean` here? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73646330 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] ( *$$ * * - * @param coefficients The coefficients corresponding to the features. + * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param labelStd The standard deviation value of the label. * @param labelMean The mean value of the label. * @param fitIntercept Whether to fit an intercept term. - * @param featuresStd The standard deviation values of the features. - * @param featuresMean The mean values of the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param bcFeaturesMean The broadcast mean values of the features. */ private class LeastSquaresAggregator( -coefficients: Vector, +bcCoefficients: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, -featuresStd: Array[Double], -featuresMean: Array[Double]) extends Serializable { +bcFeaturesStd: Broadcast[Array[Double]], +bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 - private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = { -val coefficientsArray = coefficients.toArray.clone() + private val dim = bcCoefficients.value.size + @transient private lazy val featuresStd = bcFeaturesStd.value + @transient private lazy val effectiveCoefAndOffset = { --- End diff -- Oh, this is indeed obscure. I like the fact that using `@transient` will tell those fields are not being serialized. However, this can be difficulty to debug. How about have the documentation written in the code? Or we can do `def initializeEffectiveCoefficientsVectorAndOffset`, and call it in the `add` method for the first time? I don't have strong opinion about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73644694 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] ( *$$ * * - * @param coefficients The coefficients corresponding to the features. + * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param labelStd The standard deviation value of the label. * @param labelMean The mean value of the label. * @param fitIntercept Whether to fit an intercept term. - * @param featuresStd The standard deviation values of the features. - * @param featuresMean The mean values of the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param bcFeaturesMean The broadcast mean values of the features. */ private class LeastSquaresAggregator( -coefficients: Vector, +bcCoefficients: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, -featuresStd: Array[Double], -featuresMean: Array[Double]) extends Serializable { +bcFeaturesStd: Broadcast[Array[Double]], +bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 - private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = { -val coefficientsArray = coefficients.toArray.clone() + private val dim = bcCoefficients.value.size + @transient private lazy val featuresStd = bcFeaturesStd.value + @transient private lazy val effectiveCoefAndOffset = { --- End diff -- @sethah has explained this issue in comment which has been folded. ```@transient private lazy val (effectiveCoefficientsVector: Vector, offset: Double)``` will generates a ```Tuple2``` which does not contain the transient tag. The individual vals are still transient, but the tuple is not and thus gets serialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73643402 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] ( *$$ * * - * @param coefficients The coefficients corresponding to the features. + * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param labelStd The standard deviation value of the label. * @param labelMean The mean value of the label. * @param fitIntercept Whether to fit an intercept term. - * @param featuresStd The standard deviation values of the features. - * @param featuresMean The mean values of the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param bcFeaturesMean The broadcast mean values of the features. */ private class LeastSquaresAggregator( -coefficients: Vector, +bcCoefficients: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, -featuresStd: Array[Double], -featuresMean: Array[Double]) extends Serializable { +bcFeaturesStd: Broadcast[Array[Double]], +bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 - private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = { -val coefficientsArray = coefficients.toArray.clone() + private val dim = bcCoefficients.value.size + @transient private lazy val featuresStd = bcFeaturesStd.value + @transient private lazy val effectiveCoefAndOffset = { --- End diff -- How about "@transient private lazy val (effectiveCoefficientsVector: Vector, offset: Double) = ..." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73301221 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -958,14 +973,17 @@ private class LeastSquaresCostFun( override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) +val bcFeaturesStd = instances.context.broadcast(featuresStd) +val bcFeaturesMean = instances.context.broadcast(featuresMean) +val bcCoeffs = instances.context.broadcast(coeffs) --- End diff -- Can we move ```bcFeaturesStd``` and ```bcFeaturesMean``` out of ```calculate```? Since they are invariables during the iterations, we should not broadcast them in each iterations. Meanwhile, it's better to call ```destroy(blocking = false)``` to release memory of broadcast variables in the right place and time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r73178468 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -822,27 +833,30 @@ class LinearRegressionSummary private[regression] ( * \frac{\partial L}{\partial\w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) * }}}, * - * @param coefficients The coefficients corresponding to the features. + * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param labelStd The standard deviation value of the label. * @param labelMean The mean value of the label. * @param fitIntercept Whether to fit an intercept term. - * @param featuresStd The standard deviation values of the features. - * @param featuresMean The mean values of the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param bcFeaturesMean The broadcast mean values of the features. */ private class LeastSquaresAggregator( -coefficients: Vector, +bcCoefficients: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, -featuresStd: Array[Double], -featuresMean: Array[Double]) extends Serializable { +bcFeaturesStd: Broadcast[Array[Double]], +bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 - private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = { -val coefficientsArray = coefficients.toArray.clone() + private val dim = bcCoefficients.value.size + @transient private lazy val featuresStd = bcFeaturesStd.value + @transient private lazy val coefAndOffset = { --- End diff -- ```coefAndOffset``` should be ```effectiveCoefficientsAndOffset``` be better? It's effective coefficients rather than coefficients. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/14109#discussion_r70153596 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -856,10 +870,10 @@ private class LeastSquaresAggregator( i += 1 } val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0 -(coefficientsArray, offset, coefficientsArray.length) +(Vectors.dense(coefficientsArray), offset) } - - private val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray) + @transient private lazy val effectiveCoefficientsVector = coefAndOffset._1 --- End diff -- Before, these values were assigned simultaneously implicitly using a pattern match. It turns out that marking it as `@transient lazy val` doesn't work because the unapply method generates a `Tuple2` which does not contain the transient tag. The individual vals are still transient, but the tuple is not and thus gets serialized. This obscure/hidden consequence of pattern matching is one good argument not to use the `@transient` approach. e.g. the following doesn't work ```scala @transient private lazy val (x, y) = { ... (x, y) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...
GitHub user sethah opened a pull request: https://github.com/apache/spark/pull/14109 [SPARK-16404][ML] LeastSquaresAggregators serializes unnecessary data ## What changes were proposed in this pull request? Similar to `LogisticAggregator`, `LeastSquaresAggregator` used for linear regression ends up serializing the coefficients and the features standard deviations, which is not necessary and can cause performance issues for high dimensional data. This patch removes this serialization. In https://github.com/apache/spark/pull/13729 the approach was to pass these values directly to the add method. The approach used here, initially, is to mark these fields as transient instead which gives the benefit of keeping the signature of the add method simple and interpretable. The downside is that it requires the use of `@transient lazy val`s which are difficult to reason about if one is not quite familiar with serialization in Scala/Spark. ## How was this patch tested? **MLlib** ![image](https://cloud.githubusercontent.com/assets/7275795/16703660/436f79fa-4524-11e6-9022-ef00058ec718.png) **ML without patch** ![image](https://cloud.githubusercontent.com/assets/7275795/16703831/c4d50b9e-4525-11e6-80cb-9b58c850cd41.png) **ML with patch** ![image](https://cloud.githubusercontent.com/assets/7275795/16703675/63e0cf40-4524-11e6-9120-1f512a70e083.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sethah/spark LIR_serialize Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14109 commit 6b85765fb37f32e0c7587cd463e387a13ed5abdd Author: sethahDate: 2016-07-08T18:53:33Z remove unnecessary serialization in linear regression commit 53655486b8ab28aa27077910672d17f3f6b66dd4 Author: sethah Date: 2016-07-08T22:45:07Z using transient commit 53c9192f0a8c234a1d185e3066154861ef25b77f Author: sethah Date: 2016-07-08T22:52:12Z style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org