[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-05 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73765222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -996,19 +1013,24 @@ private class LeastSquaresCostFun(
 featuresMean: Array[Double],
 effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
 
+  val bcFeaturesStd = instances.context.broadcast(featuresStd)
+  val bcFeaturesMean = instances.context.broadcast(featuresMean)
+
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) 
= {
 val coeffs = Vectors.fromBreeze(coefficients)
+val bcCoeffs = instances.context.broadcast(coeffs)
 
 val leastSquaresAggregator = {
   val seqOp = (c: LeastSquaresAggregator, instance: Instance) => 
c.add(instance)
   val combOp = (c1: LeastSquaresAggregator, c2: 
LeastSquaresAggregator) => c1.merge(c2)
 
   instances.treeAggregate(
-new LeastSquaresAggregator(coeffs, labelStd, labelMean, 
fitIntercept, featuresStd,
-  featuresMean))(seqOp, combOp)
+new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, 
fitIntercept, bcFeaturesStd,
+  bcFeaturesMean))(seqOp, combOp)
 }
 
 val totalGradientArray = leastSquaresAggregator.gradient.toArray
+bcCoeffs.destroy(blocking = false)
 
--- End diff --

We cannot destroy them here because they are used on every iteration. I 
just added a commit to fix this so that, after the algorithm is run, we destroy 
the broadcast variables. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-05 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73754722
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -996,19 +1013,24 @@ private class LeastSquaresCostFun(
 featuresMean: Array[Double],
 effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
 
+  val bcFeaturesStd = instances.context.broadcast(featuresStd)
+  val bcFeaturesMean = instances.context.broadcast(featuresMean)
+
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) 
= {
 val coeffs = Vectors.fromBreeze(coefficients)
+val bcCoeffs = instances.context.broadcast(coeffs)
 
 val leastSquaresAggregator = {
   val seqOp = (c: LeastSquaresAggregator, instance: Instance) => 
c.add(instance)
   val combOp = (c1: LeastSquaresAggregator, c2: 
LeastSquaresAggregator) => c1.merge(c2)
 
   instances.treeAggregate(
-new LeastSquaresAggregator(coeffs, labelStd, labelMean, 
fitIntercept, featuresStd,
-  featuresMean))(seqOp, combOp)
+new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, 
fitIntercept, bcFeaturesStd,
+  bcFeaturesMean))(seqOp, combOp)
 }
 
 val totalGradientArray = leastSquaresAggregator.gradient.toArray
+bcCoeffs.destroy(blocking = false)
 
--- End diff --

BTW, why do we not explicitly destroy `bcFeaturesStd` and `bcFeaturesMean` 
here? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-05 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73646330
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] (
  *$$
  * 
  *
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the 
features.
  * @param labelStd The standard deviation value of the label.
  * @param labelMean The mean value of the label.
  * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
  */
 private class LeastSquaresAggregator(
-coefficients: Vector,
+bcCoefficients: Broadcast[Vector],
 labelStd: Double,
 labelMean: Double,
 fitIntercept: Boolean,
-featuresStd: Array[Double],
-featuresMean: Array[Double]) extends Serializable {
+bcFeaturesStd: Broadcast[Array[Double]],
+bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
 
   private var totalCnt: Long = 0L
   private var weightSum: Double = 0.0
   private var lossSum = 0.0
 
-  private val (effectiveCoefficientsArray: Array[Double], offset: Double, 
dim: Int) = {
-val coefficientsArray = coefficients.toArray.clone()
+  private val dim = bcCoefficients.value.size
+  @transient private lazy val featuresStd = bcFeaturesStd.value
+  @transient private lazy val effectiveCoefAndOffset = {
--- End diff --

Oh, this is indeed obscure. I like the fact that using `@transient` will 
tell those fields are not being serialized. However, this can be difficulty to 
debug. How about have the documentation written in the code? Or we can do `def 
initializeEffectiveCoefficientsVectorAndOffset`, and call it in the `add` 
method for the first time? I don't have strong opinion about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-04 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73644694
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] (
  *$$
  * 
  *
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the 
features.
  * @param labelStd The standard deviation value of the label.
  * @param labelMean The mean value of the label.
  * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
  */
 private class LeastSquaresAggregator(
-coefficients: Vector,
+bcCoefficients: Broadcast[Vector],
 labelStd: Double,
 labelMean: Double,
 fitIntercept: Boolean,
-featuresStd: Array[Double],
-featuresMean: Array[Double]) extends Serializable {
+bcFeaturesStd: Broadcast[Array[Double]],
+bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
 
   private var totalCnt: Long = 0L
   private var weightSum: Double = 0.0
   private var lossSum = 0.0
 
-  private val (effectiveCoefficientsArray: Array[Double], offset: Double, 
dim: Int) = {
-val coefficientsArray = coefficients.toArray.clone()
+  private val dim = bcCoefficients.value.size
+  @transient private lazy val featuresStd = bcFeaturesStd.value
+  @transient private lazy val effectiveCoefAndOffset = {
--- End diff --

@sethah has explained this issue in comment which has been folded. 
```@transient private lazy val (effectiveCoefficientsVector: Vector, 
offset: Double)``` will generates a ```Tuple2``` which does not contain the 
transient tag. The individual vals are still transient, but the tuple is not 
and thus gets serialized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-04 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73643402
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -862,27 +873,30 @@ class LinearRegressionSummary private[regression] (
  *$$
  * 
  *
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the 
features.
  * @param labelStd The standard deviation value of the label.
  * @param labelMean The mean value of the label.
  * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
  */
 private class LeastSquaresAggregator(
-coefficients: Vector,
+bcCoefficients: Broadcast[Vector],
 labelStd: Double,
 labelMean: Double,
 fitIntercept: Boolean,
-featuresStd: Array[Double],
-featuresMean: Array[Double]) extends Serializable {
+bcFeaturesStd: Broadcast[Array[Double]],
+bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
 
   private var totalCnt: Long = 0L
   private var weightSum: Double = 0.0
   private var lossSum = 0.0
 
-  private val (effectiveCoefficientsArray: Array[Double], offset: Double, 
dim: Int) = {
-val coefficientsArray = coefficients.toArray.clone()
+  private val dim = bcCoefficients.value.size
+  @transient private lazy val featuresStd = bcFeaturesStd.value
+  @transient private lazy val effectiveCoefAndOffset = {
--- End diff --

How about "@transient private lazy val (effectiveCoefficientsVector: 
Vector, offset: Double) = ..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-03 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73301221
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -958,14 +973,17 @@ private class LeastSquaresCostFun(
 
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) 
= {
 val coeffs = Vectors.fromBreeze(coefficients)
+val bcFeaturesStd = instances.context.broadcast(featuresStd)
+val bcFeaturesMean = instances.context.broadcast(featuresMean)
+val bcCoeffs = instances.context.broadcast(coeffs)
--- End diff --

Can we move ```bcFeaturesStd``` and ```bcFeaturesMean``` out of 
```calculate```? Since they are invariables during the iterations, we should 
not broadcast them in each iterations. 
Meanwhile, it's better to call ```destroy(blocking = false)``` to release 
memory of broadcast variables in the right place and time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-08-02 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r73178468
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -822,27 +833,30 @@ class LinearRegressionSummary private[regression] (
  * \frac{\partial L}{\partial\w_i} = 1/N ((\sum_j diff_j x_{ij} / 
\hat{x_i})
  * }}},
  *
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the 
features.
  * @param labelStd The standard deviation value of the label.
  * @param labelMean The mean value of the label.
  * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
  */
 private class LeastSquaresAggregator(
-coefficients: Vector,
+bcCoefficients: Broadcast[Vector],
 labelStd: Double,
 labelMean: Double,
 fitIntercept: Boolean,
-featuresStd: Array[Double],
-featuresMean: Array[Double]) extends Serializable {
+bcFeaturesStd: Broadcast[Array[Double]],
+bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
 
   private var totalCnt: Long = 0L
   private var weightSum: Double = 0.0
   private var lossSum = 0.0
 
-  private val (effectiveCoefficientsArray: Array[Double], offset: Double, 
dim: Int) = {
-val coefficientsArray = coefficients.toArray.clone()
+  private val dim = bcCoefficients.value.size
+  @transient private lazy val featuresStd = bcFeaturesStd.value
+  @transient private lazy val coefAndOffset = {
--- End diff --

```coefAndOffset``` should be ```effectiveCoefficientsAndOffset``` be 
better? It's effective coefficients rather than coefficients.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-07-08 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/14109#discussion_r70153596
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -856,10 +870,10 @@ private class LeastSquaresAggregator(
   i += 1
 }
 val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
-(coefficientsArray, offset, coefficientsArray.length)
+(Vectors.dense(coefficientsArray), offset)
   }
-
-  private val effectiveCoefficientsVector = 
Vectors.dense(effectiveCoefficientsArray)
+  @transient private lazy val effectiveCoefficientsVector = 
coefAndOffset._1
--- End diff --

Before, these values were assigned simultaneously implicitly using a 
pattern match. It turns out that marking it as `@transient lazy val` doesn't 
work because the unapply method generates a `Tuple2` which does not contain the 
transient tag. The individual vals are still transient, but the tuple is not 
and thus gets serialized. This obscure/hidden consequence of pattern matching 
is one good argument not to use the `@transient` approach. e.g. the following 
doesn't work

```scala
@transient private lazy val (x, y) = {
  ...
  (x, y)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14109: [SPARK-16404][ML] LeastSquaresAggregators seriali...

2016-07-08 Thread sethah
GitHub user sethah opened a pull request:

https://github.com/apache/spark/pull/14109

[SPARK-16404][ML] LeastSquaresAggregators serializes unnecessary data

## What changes were proposed in this pull request?
Similar to `LogisticAggregator`, `LeastSquaresAggregator` used for linear 
regression ends up serializing the coefficients and the features standard 
deviations, which is not necessary and can cause performance issues for high 
dimensional data. This patch removes this serialization.

In https://github.com/apache/spark/pull/13729 the approach was to pass 
these values directly to the add method. The approach used here, initially, is 
to mark these fields as transient instead which gives the benefit of keeping 
the signature of the add method simple and interpretable. The downside is that 
it requires the use of `@transient lazy val`s which are difficult to reason 
about if one is not quite familiar with serialization in Scala/Spark. 

## How was this patch tested?

**MLlib**

![image](https://cloud.githubusercontent.com/assets/7275795/16703660/436f79fa-4524-11e6-9022-ef00058ec718.png)

**ML without patch**

![image](https://cloud.githubusercontent.com/assets/7275795/16703831/c4d50b9e-4525-11e6-80cb-9b58c850cd41.png)


**ML with patch**

![image](https://cloud.githubusercontent.com/assets/7275795/16703675/63e0cf40-4524-11e6-9120-1f512a70e083.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sethah/spark LIR_serialize

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14109


commit 6b85765fb37f32e0c7587cd463e387a13ed5abdd
Author: sethah 
Date:   2016-07-08T18:53:33Z

remove unnecessary serialization in linear regression

commit 53655486b8ab28aa27077910672d17f3f6b66dd4
Author: sethah 
Date:   2016-07-08T22:45:07Z

using transient

commit 53c9192f0a8c234a1d185e3066154861ef25b77f
Author: sethah 
Date:   2016-07-08T22:52:12Z

style




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org