Repository: spark
Updated Branches:
  refs/heads/master 69cd53eae -> bf1a6aaac


[SPARK-4581][MLlib] Refactorize StandardScaler to improve the transformation 
performance

The following optimizations are done to improve the StandardScaler model
transformation performance.

1) Covert Breeze dense vector to primitive vector to reduce the overhead.
2) Since mean can be potentially a sparse vector, we explicitly convert it to 
dense primitive vector.
3) Have a local reference to `shift` and `factor` array so JVM can locate the 
value with one operation call.
4) In pattern matching part, we use the mllib SparseVector/DenseVector instead 
of breeze's vector to
make the codebase cleaner.

Benchmark with mnist8m dataset:

Before,
DenseVector withMean and withStd: 50.97secs
DenseVector withMean and withoutStd: 42.11secs
DenseVector withoutMean and withStd: 8.75secs
SparseVector withoutMean and withStd: 5.437secs

With this PR,
DenseVector withMean and withStd: 5.76secs
DenseVector withMean and withoutStd: 5.28secs
DenseVector withoutMean and withStd: 5.30secs
SparseVector withoutMean and withStd: 1.27secs

Note that without the local reference copy of `factor` and `shift` arrays,
the runtime is almost three time slower.

DenseVector withMean and withStd: 18.15secs
DenseVector withMean and withoutStd: 18.05secs
DenseVector withoutMean and withStd: 18.54secs
SparseVector withoutMean and withStd: 2.01secs

The following code,
```scala
while (i < size) {
   values(i) = (values(i) - shift(i)) * factor(i)
   i += 1
}
```
will generate the bytecode
```
   L13
    LINENUMBER 106 L13
   FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel 
org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/Vector 
org/apache/spark/mllib/linalg/DenseVector T [D I I] []
    ILOAD 7
    ILOAD 6
    IF_ICMPGE L14
   L15
    LINENUMBER 107 L15
    ALOAD 5
    ILOAD 7
    ALOAD 5
    ILOAD 7
    DALOAD
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.shift ()[D
    ILOAD 7
    DALOAD
    DSUB
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
    ILOAD 7
    DALOAD
    DMUL
    DASTORE
   L16
    LINENUMBER 108 L16
    ILOAD 7
    ICONST_1
    IADD
    ISTORE 7
    GOTO L13
```
, while with the local reference of the `shift` and `factor` arrays, the 
bytecode will be
```
   L14
    LINENUMBER 107 L14
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
    ASTORE 9
   L15
    LINENUMBER 108 L15
   FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel 
org/apache/spark/mllib/linalg/Vector [D org/apache/spark/mllib/linalg/Vector 
org/apache/spark/mllib/linalg/DenseVector T [D I I [D] []
    ILOAD 8
    ILOAD 7
    IF_ICMPGE L16
   L17
    LINENUMBER 109 L17
    ALOAD 6
    ILOAD 8
    ALOAD 6
    ILOAD 8
    DALOAD
    ALOAD 2
    ILOAD 8
    DALOAD
    DSUB
    ALOAD 9
    ILOAD 8
    DALOAD
    DMUL
    DASTORE
   L18
    LINENUMBER 110 L18
    ILOAD 8
    ICONST_1
    IADD
    ISTORE 8
    GOTO L15
```

You can see that with local reference, the both of the arrays will be in the 
stack, so JVM can access the value without calling `INVOKESPECIAL`.

Author: DB Tsai <dbt...@alpinenow.com>

Closes #3435 from dbtsai/standardscaler and squashes the following commits:

85885a9 [DB Tsai] revert to have lazy in shift array.
daf2b06 [DB Tsai] Address the feedback
cdb5cef [DB Tsai] small change
9c51eef [DB Tsai] style
fc795e4 [DB Tsai] update
5bffd3d [DB Tsai] first commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf1a6aaa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf1a6aaa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf1a6aaa

Branch: refs/heads/master
Commit: bf1a6aaac577757a293a573fe8eae9669697310a
Parents: 69cd53e
Author: DB Tsai <dbt...@alpinenow.com>
Authored: Tue Nov 25 11:07:11 2014 -0800
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Tue Nov 25 11:07:11 2014 -0800

----------------------------------------------------------------------
 .../spark/mllib/feature/StandardScaler.scala    | 70 ++++++++++++++------
 1 file changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf1a6aaa/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
index 4dfd1f0..8c4c5db 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.mllib.feature
 
-import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
-
 import org.apache.spark.Logging
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
 import org.apache.spark.mllib.rdd.RDDFunctions._
 import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
 import org.apache.spark.rdd.RDD
@@ -77,8 +75,8 @@ class StandardScalerModel private[mllib] (
 
   require(mean.size == variance.size)
 
-  private lazy val factor: BDV[Double] = {
-    val f = BDV.zeros[Double](variance.size)
+  private lazy val factor: Array[Double] = {
+    val f = Array.ofDim[Double](variance.size)
     var i = 0
     while (i < f.size) {
       f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
@@ -87,6 +85,11 @@ class StandardScalerModel private[mllib] (
     f
   }
 
+  // Since `shift` will be only used in `withMean` branch, we have it as
+  // `lazy val` so it will be evaluated in that branch. Note that we don't
+  // want to create this array multiple times in `transform` function.
+  private lazy val shift: Array[Double] = mean.toArray
+
   /**
    * Applies standardization transformation on a vector.
    *
@@ -97,30 +100,57 @@ class StandardScalerModel private[mllib] (
   override def transform(vector: Vector): Vector = {
     require(mean.size == vector.size)
     if (withMean) {
-      vector.toBreeze match {
-        case dv: BDV[Double] =>
-          val output = vector.toBreeze.copy
-          var i = 0
-          while (i < output.length) {
-            output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 
1.0)
-            i += 1
+      // By default, Scala generates Java methods for member variables. So 
every time when
+      // the member variables are accessed, `invokespecial` will be called 
which is expensive.
+      // This can be avoid by having a local reference of `shift`.
+      val localShift = shift
+      vector match {
+        case dv: DenseVector =>
+          val values = dv.values.clone()
+          val size = values.size
+          if (withStd) {
+            // Having a local reference of `factor` to avoid overhead as the 
comment before.
+            val localFactor = factor
+            var i = 0
+            while (i < size) {
+              values(i) = (values(i) - localShift(i)) * localFactor(i)
+              i += 1
+            }
+          } else {
+            var i = 0
+            while (i < size) {
+              values(i) -= localShift(i)
+              i += 1
+            }
           }
-          Vectors.fromBreeze(output)
+          Vectors.dense(values)
         case v => throw new IllegalArgumentException("Do not support vector 
type " + v.getClass)
       }
     } else if (withStd) {
-      vector.toBreeze match {
-        case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor)
-        case sv: BSV[Double] =>
+      // Having a local reference of `factor` to avoid overhead as the comment 
before.
+      val localFactor = factor
+      vector match {
+        case dv: DenseVector =>
+          val values = dv.values.clone()
+          val size = values.size
+          var i = 0
+          while(i < size) {
+            values(i) *= localFactor(i)
+            i += 1
+          }
+          Vectors.dense(values)
+        case sv: SparseVector =>
           // For sparse vector, the `index` array inside sparse vector object 
will not be changed,
           // so we can re-use it to save memory.
-          val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
+          val indices = sv.indices
+          val values = sv.values.clone()
+          val nnz = values.size
           var i = 0
-          while (i < output.data.length) {
-            output.data(i) *= factor(output.index(i))
+          while (i < nnz) {
+            values(i) *= localFactor(indices(i))
             i += 1
           }
-          Vectors.fromBreeze(output)
+          Vectors.sparse(sv.size, indices, values)
         case v => throw new IllegalArgumentException("Do not support vector 
type " + v.getClass)
       }
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to