Repository: spark
Updated Branches:
  refs/heads/master 4c722d77a -> 8509519d8


[SPARK-5894] [ML] Add polynomial mapper

See [SPARK-5894](https://issues.apache.org/jira/browse/SPARK-5894).

Author: Xusen Yin <yinxu...@gmail.com>
Author: Xiangrui Meng <m...@databricks.com>

Closes #5245 from yinxusen/SPARK-5894 and squashes the following commits:

dc461a6 [Xusen Yin] merge polynomial expansion v2
6d0c3cc [Xusen Yin] Merge branch 'SPARK-5894' of 
https://github.com/mengxr/spark into mengxr-SPARK-5894
57bfdd5 [Xusen Yin] Merge branch 'master' into SPARK-5894
3d02a7d [Xusen Yin] Merge branch 'master' into SPARK-5894
a067da2 [Xiangrui Meng] a new approach for poly expansion
0789d81 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into 
SPARK-5894
4e9aed0 [Xusen Yin] fix test suite
95d8fb9 [Xusen Yin] fix sparse vector indices
8d39674 [Xusen Yin] fix sparse vector expansion error
5998dd6 [Xusen Yin] fix dense vector fillin
fa3ade3 [Xusen Yin] change the functional code into imperative one to speedup
b70e7e1 [Xusen Yin] remove useless case class
6fa236f [Xusen Yin] fix vector slice error
daff601 [Xusen Yin] fix index error of sparse vector
6bd0a10 [Xusen Yin] merge repeated features
419f8a2 [Xusen Yin] need to merge same columns
4ebf34e [Xusen Yin] add test suite of polynomial expansion
372227c [Xusen Yin] add polynomial expansion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8509519d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8509519d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8509519d

Branch: refs/heads/master
Commit: 8509519d8bcf99e2d1b5e21da514d51357f9116d
Parents: 4c722d7
Author: Xusen Yin <yinxu...@gmail.com>
Authored: Fri Apr 24 00:39:29 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Apr 24 00:39:29 2015 -0700

----------------------------------------------------------------------
 .../spark/ml/feature/PolynomialExpansion.scala  | 167 +++++++++++++++++++
 .../ml/feature/PolynomialExpansionSuite.scala   | 104 ++++++++++++
 2 files changed, 271 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8509519d/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
new file mode 100644
index 0000000..c3a59a3
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.AlphaComponent
+import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.param.{IntParam, ParamMap}
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * :: AlphaComponent ::
+ * Perform feature expansion in a polynomial space. As said in wikipedia of 
Polynomial Expansion,
+ * which is available at 
[[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an
+ * expansion of a product of sums expresses it as a sum of products by using 
the fact that
+ * multiplication distributes over addition". Take a 2-variable feature vector 
as an example:
+ * `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, 
x * y, y * y)`.
+ */
+@AlphaComponent
+class PolynomialExpansion extends UnaryTransformer[Vector, Vector, 
PolynomialExpansion] {
+
+  /**
+   * The polynomial degree to expand, which should be larger than 1.
+   * @group param
+   */
+  val degree = new IntParam(this, "degree", "the polynomial degree to expand")
+  setDefault(degree -> 2)
+
+  /** @group getParam */
+  def getDegree: Int = getOrDefault(degree)
+
+  /** @group setParam */
+  def setDegree(value: Int): this.type = set(degree, value)
+
+  override protected def createTransformFunc(paramMap: ParamMap): Vector => 
Vector = { v =>
+    val d = paramMap(degree)
+    PolynomialExpansion.expand(v, d)
+  }
+
+  override protected def outputDataType: DataType = new VectorUDT()
+}
+
+/**
+ * The expansion is done via recursion. Given n features and degree d, the 
size after expansion is
+ * (n + d choose d) (including 1 and first-order values). For example, let 
f([a, b, c], 3) be the
+ * function that expands [a, b, c] to their monomials of degree 3. We have the 
following recursion:
+ *
+ * {{{
+ * f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ 
[c^3]
+ * }}}
+ *
+ * To handle sparsity, if c is zero, we can skip all monomials that contain 
it. We remember the
+ * current index and increment it properly for sparse input.
+ */
+object PolynomialExpansion {
+
+  private def choose(n: Int, k: Int): Int = {
+    Range(n, n - k, -1).product / Range(k, 1, -1).product
+  }
+
+  private def getPolySize(numFeatures: Int, degree: Int): Int = 
choose(numFeatures + degree, degree)
+
+  private def expandDense(
+      values: Array[Double],
+      lastIdx: Int,
+      degree: Int,
+      multiplier: Double,
+      polyValues: Array[Double],
+      curPolyIdx: Int): Int = {
+    if (multiplier == 0.0) {
+      // do nothing
+    } else if (degree == 0 || lastIdx < 0) {
+      polyValues(curPolyIdx) = multiplier
+    } else {
+      val v = values(lastIdx)
+      val lastIdx1 = lastIdx - 1
+      var alpha = multiplier
+      var i = 0
+      var curStart = curPolyIdx
+      while (i <= degree && alpha != 0.0) {
+        curStart = expandDense(values, lastIdx1, degree - i, alpha, 
polyValues, curStart)
+        i += 1
+        alpha *= v
+      }
+    }
+    curPolyIdx + getPolySize(lastIdx + 1, degree)
+  }
+
+  private def expandSparse(
+      indices: Array[Int],
+      values: Array[Double],
+      lastIdx: Int,
+      lastFeatureIdx: Int,
+      degree: Int,
+      multiplier: Double,
+      polyIndices: mutable.ArrayBuilder[Int],
+      polyValues: mutable.ArrayBuilder[Double],
+      curPolyIdx: Int): Int = {
+    if (multiplier == 0.0) {
+      // do nothing
+    } else if (degree == 0 || lastIdx < 0) {
+      polyIndices += curPolyIdx
+      polyValues += multiplier
+    } else {
+      // Skip all zeros at the tail.
+      val v = values(lastIdx)
+      val lastIdx1 = lastIdx - 1
+      val lastFeatureIdx1 = indices(lastIdx) - 1
+      var alpha = multiplier
+      var curStart = curPolyIdx
+      var i = 0
+      while (i <= degree && alpha != 0.0) {
+        curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, 
degree - i, alpha,
+          polyIndices, polyValues, curStart)
+        i += 1
+        alpha *= v
+      }
+    }
+    curPolyIdx + getPolySize(lastFeatureIdx + 1, degree)
+  }
+
+  private def expand(dv: DenseVector, degree: Int): DenseVector = {
+    val n = dv.size
+    val polySize = getPolySize(n, degree)
+    val polyValues = new Array[Double](polySize)
+    expandDense(dv.values, n - 1, degree, 1.0, polyValues, 0)
+    new DenseVector(polyValues)
+  }
+
+  private def expand(sv: SparseVector, degree: Int): SparseVector = {
+    val polySize = getPolySize(sv.size, degree)
+    val nnz = sv.values.length
+    val nnzPolySize = getPolySize(nnz, degree)
+    val polyIndices = mutable.ArrayBuilder.make[Int]
+    polyIndices.sizeHint(nnzPolySize)
+    val polyValues = mutable.ArrayBuilder.make[Double]
+    polyValues.sizeHint(nnzPolySize)
+    expandSparse(
+      sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, 
polyValues, 0)
+    new SparseVector(polySize, polyIndices.result(), polyValues.result())
+  }
+
+  def expand(v: Vector, degree: Int): Vector = {
+    v match {
+      case dv: DenseVector => expand(dv, degree)
+      case sv: SparseVector => expand(sv, degree)
+      case _ => throw new IllegalArgumentException
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8509519d/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
new file mode 100644
index 0000000..b0a537b
--- /dev/null
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.sql.{Row, SQLContext}
+import org.scalatest.exceptions.TestFailedException
+
+class PolynomialExpansionSuite extends FunSuite with MLlibTestSparkContext {
+
+  @transient var sqlContext: SQLContext = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    sqlContext = new SQLContext(sc)
+  }
+
+  test("Polynomial expansion with default parameter") {
+    val data = Array(
+      Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+      Vectors.dense(-2.0, 2.3),
+      Vectors.dense(0.0, 0.0, 0.0),
+      Vectors.dense(0.6, -1.1, -3.0),
+      Vectors.sparse(3, Seq())
+    )
+
+    val twoDegreeExpansion: Array[Vector] = Array(
+      Vectors.sparse(10, Array(0, 1, 2, 3, 4, 5), Array(1.0, -2.0, 4.0, 2.3, 
-4.6, 5.29)),
+      Vectors.dense(1.0, -2.0, 4.0, 2.3, -4.6, 5.29),
+      Vectors.dense(Array(1.0) ++ Array.fill[Double](9)(0.0)),
+      Vectors.dense(1.0, 0.6, 0.36, -1.1, -0.66, 1.21, -3.0, -1.8, 3.3, 9.0),
+      Vectors.sparse(10, Array(0), Array(1.0)))
+
+    val df = 
sqlContext.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", 
"expected")
+
+    val polynomialExpansion = new PolynomialExpansion()
+      .setInputCol("features")
+      .setOutputCol("polyFeatures")
+
+    polynomialExpansion.transform(df).select("polyFeatures", 
"expected").collect().foreach {
+      case Row(expanded: DenseVector, expected: DenseVector) =>
+        assert(expanded ~== expected absTol 1e-1)
+      case Row(expanded: SparseVector, expected: SparseVector) =>
+        assert(expanded ~== expected absTol 1e-1)
+      case _ =>
+        throw new TestFailedException("Unmatched data types after polynomial 
expansion", 0)
+    }
+  }
+
+  test("Polynomial expansion with setter") {
+    val data = Array(
+      Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+      Vectors.dense(-2.0, 2.3),
+      Vectors.dense(0.0, 0.0, 0.0),
+      Vectors.dense(0.6, -1.1, -3.0),
+      Vectors.sparse(3, Seq())
+    )
+
+    val threeDegreeExpansion: Array[Vector] = Array(
+      Vectors.sparse(20, Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+        Array(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17)),
+      Vectors.dense(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17),
+      Vectors.dense(Array(1.0) ++ Array.fill[Double](19)(0.0)),
+      Vectors.dense(1.0, 0.6, 0.36, 0.216, -1.1, -0.66, -0.396, 1.21, 0.726, 
-1.331, -3.0, -1.8,
+        -1.08, 3.3, 1.98, -3.63, 9.0, 5.4, -9.9, -27.0),
+      Vectors.sparse(20, Array(0), Array(1.0)))
+
+    val df = 
sqlContext.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", 
"expected")
+
+    val polynomialExpansion = new PolynomialExpansion()
+      .setInputCol("features")
+      .setOutputCol("polyFeatures")
+      .setDegree(3)
+
+    polynomialExpansion.transform(df).select("polyFeatures", 
"expected").collect().foreach {
+      case Row(expanded: DenseVector, expected: DenseVector) =>
+        assert(expanded ~== expected absTol 1e-1)
+      case Row(expanded: SparseVector, expected: SparseVector) =>
+        assert(expanded ~== expected absTol 1e-1)
+      case _ =>
+        throw new TestFailedException("Unmatched data types after polynomial 
expansion", 0)
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to