Repository: spark
Updated Branches:
  refs/heads/master ba23f768f -> 46b2550bc


[SPARK-18060][ML] Avoid unnecessary computation for MLOR

## What changes were proposed in this pull request?

Before this patch, the gradient updates for multinomial logistic regression 
were computed by an outer loop over the number of classes and an inner loop 
over the number of features. Inside the inner loop, we standardized the feature 
value (`value / featuresStd(index)`), which means we performed the computation 
`numFeatures * numClasses` times. We only need to perform that computation 
`numFeatures` times, however. If we re-order the inner and outer loop, we can 
avoid this, but then we lose sequential memory access. In this patch, we 
instead lay out the coefficients in column major order while we train, so that 
we can avoid the extra computation and retain sequential memory access. We 
convert back to row-major order when we create the model.

## How was this patch tested?

This is an implementation detail only, so the original behavior should be 
maintained. All tests pass. I ran some performance tests to verify speedups. 
The results are below, and show significant speedups.
## Performance Tests

**Setup**

3 node bare-metal cluster
120 cores total
384 gb RAM total

**Results**

NOTE: The `currentMasterTime` and `thisPatchTime` are times in seconds for a 
single iteration of L-BFGS or OWL-QN.

|    |   numPoints |   numFeatures |   numClasses |   regParam |   
elasticNetParam |   currentMasterTime (sec) |   thisPatchTime (sec) |   
pctSpeedup |
|----|-------------|---------------|--------------|------------|-------------------|---------------------------|-----------------------|--------------|
|  0 |       1e+07 |           100 |          500 |       0.5  |                
 0 |                        90 |                    18 |           80 |
|  1 |       1e+08 |           100 |           50 |       0.5  |                
 0 |                        90 |                    19 |           78 |
|  2 |       1e+08 |           100 |           50 |       0.05 |                
 1 |                        72 |                    19 |           73 |
|  3 |       1e+06 |           100 |         5000 |       0.5  |                
 0 |                        93 |                    53 |           43 |
|  4 |       1e+07 |           100 |         5000 |       0.5  |                
 0 |                       900 |                   390 |           56 |
|  5 |       1e+08 |           100 |          500 |       0.5  |                
 0 |                       840 |                   174 |           79 |
|  6 |       1e+08 |           100 |          200 |       0.5  |                
 0 |                       360 |                    72 |           80 |
|  7 |       1e+08 |          1000 |            5 |       0.5  |                
 0 |                         9 |                     3 |           66 |

Author: sethah <seth.hendrickso...@gmail.com>

Closes #15593 from sethah/MLOR_PERF_COL_MAJOR_COEF.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46b2550b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46b2550b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46b2550b

Branch: refs/heads/master
Commit: 46b2550bcd3690a260b995fd4d024a73b92a0299
Parents: ba23f76
Author: sethah <seth.hendrickso...@gmail.com>
Authored: Sat Nov 12 01:38:26 2016 +0000
Committer: DB Tsai <dbt...@dbtsai.com>
Committed: Sat Nov 12 01:38:26 2016 +0000

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 125 +++++++++++--------
 1 file changed, 74 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46b2550b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index c465105..18b9b30 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") (
           val standardizationParam = $(standardization)
           def regParamL1Fun = (index: Int) => {
             // Remove the L1 penalization on the intercept
-            val isIntercept = $(fitIntercept) && ((index + 1) % 
numFeaturesPlusIntercept == 0)
+            val isIntercept = $(fitIntercept) && index >= numFeatures * 
numCoefficientSets
             if (isIntercept) {
               0.0
             } else {
               if (standardizationParam) {
                 regParamL1
               } else {
-                val featureIndex = if ($(fitIntercept)) {
-                  index % numFeaturesPlusIntercept
-                } else {
-                  index % numFeatures
-                }
+                val featureIndex = index / numCoefficientSets
                 // If `standardization` is false, we still standardize the data
                 // to improve the rate of convergence; as a result, we have to
                 // perform this reverse standardization by penalizing each 
component
@@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") (
           new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, 
$(tol))
         }
 
+        /*
+          The coefficients are laid out in column major order during training. 
e.g. for
+          `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the 
layout is:
+
+           Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, 
intercept_1, intercept_2,
+             intercept_3)
+
+           where beta_jk corresponds to the coefficient for class `j` and 
feature `k`.
+         */
         val initialCoefficientsWithIntercept =
           Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
 
@@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") (
           val initialCoefWithInterceptArray = 
initialCoefficientsWithIntercept.toArray
           val providedCoef = optInitialModel.get.coefficientMatrix
           providedCoef.foreachActive { (row, col, value) =>
-            val flatIndex = row * numFeaturesPlusIntercept + col
+            // convert matrix to column major for training
+            val flatIndex = col * numCoefficientSets + row
             // We need to scale the coefficients since they will be trained in 
the scaled space
             initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
           }
           if ($(fitIntercept)) {
             optInitialModel.get.interceptVector.foreachActive { (index, value) 
=>
-              val coefIndex = (index + 1) * numFeaturesPlusIntercept - 1
+              val coefIndex = numCoefficientSets * numFeatures + index
               initialCoefWithInterceptArray(coefIndex) = value
             }
           }
@@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") (
           val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for 
smoothing
           val rawMean = rawIntercepts.sum / rawIntercepts.length
           rawIntercepts.indices.foreach { i =>
-            initialCoefficientsWithIntercept.toArray(i * 
numFeaturesPlusIntercept + numFeatures) =
+            initialCoefficientsWithIntercept.toArray(numClasses * numFeatures 
+ i) =
               rawIntercepts(i) - rawMean
           }
         } else if ($(fitIntercept)) {
@@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") (
         /*
            The coefficients are trained in the scaled space; we're converting 
them back to
            the original space.
+
+           Additionally, since the coefficients were laid out in column major 
order during training
+           to avoid extra computation, we convert them back to row major 
before passing them to the
+           model.
+
            Note that the intercept in scaled space and original space is the 
same;
            as a result, no scaling is needed.
          */
         val rawCoefficients = state.x.toArray.clone()
         val coefficientArray = Array.tabulate(numCoefficientSets * 
numFeatures) { i =>
-          // flatIndex will loop though rawCoefficients, and skip the 
intercept terms.
-          val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i
+          val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / 
numFeatures
           val featureIndex = i % numFeatures
           if (featuresStd(featureIndex) != 0.0) {
-            rawCoefficients(flatIndex) / featuresStd(featureIndex)
+            rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
           } else {
             0.0
           }
@@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") (
 
         val interceptsArray: Array[Double] = if ($(fitIntercept)) {
           Array.tabulate(numCoefficientSets) { i =>
-            val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1
+            val coefIndex = numFeatures * numCoefficientSets + i
             rawCoefficients(coefIndex)
           }
         } else {
@@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] (
   /**
    * A vector of model coefficients for "binomial" logistic regression. If 
this model was trained
    * using the "multinomial" family then an exception is thrown.
+   *
    * @return Vector
    */
   @Since("2.0.0")
@@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] (
   /**
    * The model intercept for "binomial" logistic regression. If this model was 
fit with the
    * "multinomial" family then an exception is thrown.
+   *
    * @return Double
    */
   @Since("1.3.0")
@@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary 
private[classification] (
  *    $$
  * </blockquote></p>
  *
+ * @note In order to avoid unnecessary computation during calculation of the 
gradient updates
+ *       we lay out the coefficients in column major order during training. 
This allows us to
+ *       perform feature standardization once, while still retaining 
sequential memory access
+ *       for speed. We convert back to row major order when we create the 
model,
+ *       since this form is optimal for the matrix operations used for 
prediction.
+ *
  * @param bcCoefficients The broadcast coefficients corresponding to the 
features.
  * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
  * @param numClasses the number of possible outcomes for k classes 
classification problem in
@@ -1486,23 +1504,25 @@ private class LogisticAggregator(
     var marginOfLabel = 0.0
     var maxMargin = Double.NegativeInfinity
 
-    val margins = Array.tabulate(numClasses) { i =>
-      var margin = 0.0
-      features.foreachActive { (index, value) =>
-        if (localFeaturesStd(index) != 0.0 && value != 0.0) {
-          margin += localCoefficients(i * numFeaturesPlusIntercept + index) *
-            value / localFeaturesStd(index)
-        }
+    val margins = new Array[Double](numClasses)
+    features.foreachActive { (index, value) =>
+      val stdValue = value / localFeaturesStd(index)
+      var j = 0
+      while (j < numClasses) {
+        margins(j) += localCoefficients(index * numClasses + j) * stdValue
+        j += 1
       }
-
+    }
+    var i = 0
+    while (i < numClasses) {
       if (fitIntercept) {
-        margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures)
+        margins(i) += localCoefficients(numClasses * numFeatures + i)
       }
-      if (i == label.toInt) marginOfLabel = margin
-      if (margin > maxMargin) {
-        maxMargin = margin
+      if (i == label.toInt) marginOfLabel = margins(i)
+      if (margins(i) > maxMargin) {
+        maxMargin = margins(i)
       }
-      margin
+      i += 1
     }
 
     /**
@@ -1510,33 +1530,39 @@ private class LogisticAggregator(
      * We address this by subtracting maxMargin from all the margins, so it's 
guaranteed
      * that all of the new margins will be smaller than zero to prevent 
arithmetic overflow.
      */
+    val multipliers = new Array[Double](numClasses)
     val sum = {
       var temp = 0.0
-      if (maxMargin > 0) {
-        for (i <- 0 until numClasses) {
-          margins(i) -= maxMargin
-          temp += math.exp(margins(i))
-        }
-      } else {
-        for (i <- 0 until numClasses) {
-          temp += math.exp(margins(i))
-        }
+      var i = 0
+      while (i < numClasses) {
+        if (maxMargin > 0) margins(i) -= maxMargin
+        val exp = math.exp(margins(i))
+        temp += exp
+        multipliers(i) = exp
+        i += 1
       }
       temp
     }
 
-    for (i <- 0 until numClasses) {
-      val multiplier = math.exp(margins(i)) / sum - {
-        if (label == i) 1.0 else 0.0
-      }
-      features.foreachActive { (index, value) =>
-        if (localFeaturesStd(index) != 0.0 && value != 0.0) {
-          localGradientArray(i * numFeaturesPlusIntercept + index) +=
-            weight * multiplier * value / localFeaturesStd(index)
+    margins.indices.foreach { i =>
+      multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
+    }
+    features.foreachActive { (index, value) =>
+      if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+        val stdValue = value / localFeaturesStd(index)
+        var j = 0
+        while (j < numClasses) {
+          localGradientArray(index * numClasses + j) +=
+            weight * multipliers(j) * stdValue
+          j += 1
         }
       }
-      if (fitIntercept) {
-        localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += 
weight * multiplier
+    }
+    if (fitIntercept) {
+      var i = 0
+      while (i < numClasses) {
+        localGradientArray(numFeatures * numClasses + i) += weight * 
multipliers(i)
+        i += 1
       }
     }
 
@@ -1637,6 +1663,7 @@ private class LogisticCostFun(
     val bcCoeffs = instances.context.broadcast(coeffs)
     val featuresStd = bcFeaturesStd.value
     val numFeatures = featuresStd.length
+    val numCoefficientSets = if (multinomial) numClasses else 1
 
     val logisticAggregator = {
       val seqOp = (c: LogisticAggregator, instance: Instance) => 
c.add(instance)
@@ -1656,7 +1683,7 @@ private class LogisticCostFun(
       var sum = 0.0
       coeffs.foreachActive { case (index, value) =>
         // We do not apply regularization to the intercepts
-        val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 
0)
+        val isIntercept = fitIntercept && index >= numCoefficientSets * 
numFeatures
         if (!isIntercept) {
           // The following code will compute the loss of the regularization; 
also
           // the gradient of the regularization, and add back to 
totalGradientArray.
@@ -1665,11 +1692,7 @@ private class LogisticCostFun(
               totalGradientArray(index) += regParamL2 * value
               value * value
             } else {
-              val featureIndex = if (fitIntercept) {
-                index % (numFeatures + 1)
-              } else {
-                index % numFeatures
-              }
+              val featureIndex = index / numCoefficientSets
               if (featuresStd(featureIndex) != 0.0) {
                 // If `standardization` is false, we still standardize the data
                 // to improve the rate of convergence; as a result, we have to


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to