Repository: spark
Updated Branches:
  refs/heads/master 78f2af582 -> d88e69561


[SPARK-2786][mllib] Python correlations

Author: Doris Xin <doris.s....@gmail.com>

Closes #1713 from dorx/pythonCorrelation and squashes the following commits:

5f1e60c [Doris Xin] reviewer comments.
46ff6eb [Doris Xin] reviewer comments.
ad44085 [Doris Xin] style fix
e69d446 [Doris Xin] fixed missed conflicts.
eb5bf56 [Doris Xin] merge master
cc9f725 [Doris Xin] units passed.
9141a63 [Doris Xin] WIP2
d199f1f [Doris Xin] Moved correlation names into a public object
cd163d6 [Doris Xin] WIP


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d88e6956
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d88e6956
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d88e6956

Branch: refs/heads/master
Commit: d88e69561367d65e1a2b94527b80a1f65a2cba90
Parents: 78f2af5
Author: Doris Xin <doris.s....@gmail.com>
Authored: Fri Aug 1 15:02:17 2014 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Aug 1 15:02:17 2014 -0700

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  39 ++++++-
 .../apache/spark/mllib/stat/Statistics.scala    |  10 +-
 .../mllib/stat/correlation/Correlation.scala    |  49 +++++----
 .../mllib/api/python/PythonMLLibAPISuite.scala  |  21 +++-
 python/pyspark/mllib/_common.py                 |   6 +-
 python/pyspark/mllib/stat.py                    | 104 +++++++++++++++++++
 6 files changed, 199 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2e8ccf..122925d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.stat.correlation.CorrelationNames
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
@@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable {
       jsc: JavaSparkContext,
       path: String,
       minPartitions: Int): JavaRDD[Array[Byte]] =
-    MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(serializeLabeledPoint).toJavaRDD()
+    MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(serializeLabeledPoint)
 
   private def trainRegressionModel(
       trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
@@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable {
     ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
   }
 
+  /**
+   * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
+   * Returns the correlation matrix serialized into a byte array understood by 
deserializers in
+   * pyspark.
+   */
+  def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
+    val inputMatrix = X.rdd.map(deserializeDoubleVector(_))
+    val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
+    serializeDoubleMatrix(to2dArray(result))
+  }
+
+  /**
+   * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], 
method: String).
+   */
+  def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): 
Double = {
+    val xDeser = x.rdd.map(deserializeDouble(_))
+    val yDeser = y.rdd.map(deserializeDouble(_))
+    Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+  }
+
+  // used by the corr methods to retrieve the name of the correlation method 
passed in via pyspark
+  private def getCorrNameOrDefault(method: String) = {
+    if (method == null) CorrelationNames.defaultCorrName else method
+  }
+
+  // Reformat a Matrix into Array[Array[Double]] for serialization
+  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
+    val values = matrix.toArray
+    Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * 
matrix.numRows))
+  }
+
   // Used by the *RDD methods to get default seed if not passed in from pyspark
   private def getSeedOrDefault(seed: java.lang.Long): Long = {
     if (seed == null) Utils.random.nextLong else seed

http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index 9d6de9b..f416a9f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -23,21 +23,24 @@ import org.apache.spark.mllib.stat.correlation.Correlations
 import org.apache.spark.rdd.RDD
 
 /**
- * API for statistical functions in MLlib
+ * API for statistical functions in MLlib.
  */
 @Experimental
 object Statistics {
 
   /**
+   * :: Experimental ::
    * Compute the Pearson correlation matrix for the input RDD of Vectors.
    * Columns with 0 covariance produce NaN entries in the correlation matrix.
    *
    * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
    * @return Pearson correlation matrix comparing columns in X.
    */
+  @Experimental
   def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
 
   /**
+   * :: Experimental ::
    * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
    * Methods currently supported: `pearson` (default), `spearman`.
    *
@@ -51,9 +54,11 @@ object Statistics {
    *               Supported: `pearson` (default), `spearman`
    * @return Correlation matrix comparing columns in X.
    */
+  @Experimental
   def corr(X: RDD[Vector], method: String): Matrix = 
Correlations.corrMatrix(X, method)
 
   /**
+   * :: Experimental ::
    * Compute the Pearson correlation for the input RDDs.
    * Returns NaN if either vector has 0 variance.
    *
@@ -64,9 +69,11 @@ object Statistics {
    * @param y RDD[Double] of the same cardinality as x.
    * @return A Double containing the Pearson correlation between the two input 
RDD[Double]s
    */
+  @Experimental
   def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y)
 
   /**
+   * :: Experimental ::
    * Compute the correlation for the input RDDs using the specified method.
    * Methods currently supported: `pearson` (default), `spearman`.
    *
@@ -80,5 +87,6 @@ object Statistics {
    *@return A Double containing the correlation between the two input 
RDD[Double]s using the
    *         specified method.
    */
+  @Experimental
   def corr(x: RDD[Double], y: RDD[Double], method: String): Double = 
Correlations.corr(x, y, method)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
index f23393d..1fb8d7b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
@@ -49,43 +49,48 @@ private[stat] trait Correlation {
 }
 
 /**
- * Delegates computation to the specific correlation object based on the input 
method name
- *
- * Currently supported correlations: pearson, spearman.
- * After new correlation algorithms are added, please update the documentation 
here and in
- * Statistics.scala for the correlation APIs.
- *
- * Maintains the default correlation type, pearson
+ * Delegates computation to the specific correlation object based on the input 
method name.
  */
 private[stat] object Correlations {
 
-  // Note: after new types of correlations are implemented, please update this 
map
-  val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", 
SpearmanCorrelation))
-  val defaultCorrName: String = "pearson"
-  val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
-
-  def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): 
Double = {
+  def corr(x: RDD[Double],
+       y: RDD[Double],
+       method: String = CorrelationNames.defaultCorrName): Double = {
     val correlation = getCorrelationFromName(method)
     correlation.computeCorrelation(x, y)
   }
 
-  def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = {
+  def corrMatrix(X: RDD[Vector],
+      method: String = CorrelationNames.defaultCorrName): Matrix = {
     val correlation = getCorrelationFromName(method)
     correlation.computeCorrelationMatrix(X)
   }
 
-  /**
-   * Match input correlation name with a known name via simple string matching
-   *
-   * private to stat for ease of unit testing
-   */
-  private[stat] def getCorrelationFromName(method: String): Correlation = {
+  // Match input correlation name with a known name via simple string matching.
+  def getCorrelationFromName(method: String): Correlation = {
     try {
-      nameToObjectMap(method)
+      CorrelationNames.nameToObjectMap(method)
     } catch {
       case nse: NoSuchElementException =>
         throw new IllegalArgumentException("Unrecognized method name. 
Supported correlations: "
-          + nameToObjectMap.keys.mkString(", "))
+          + CorrelationNames.nameToObjectMap.keys.mkString(", "))
     }
   }
 }
+
+/**
+ * Maintains supported and default correlation names.
+ *
+ * Currently supported correlations: `pearson`, `spearman`.
+ * Current default correlation: `pearson`.
+ *
+ * After new correlation algorithms are added, please update the documentation 
here and in
+ * Statistics.scala for the correlation APIs.
+ */
+private[mllib] object CorrelationNames {
+
+  // Note: after new types of correlations are implemented, please update this 
map.
+  val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", 
SpearmanCorrelation))
+  val defaultCorrName: String = "pearson"
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
index d94cfa2..bd413a8 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.api.python
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
 
 class PythonMLLibAPISuite extends FunSuite {
@@ -59,10 +59,25 @@ class PythonMLLibAPISuite extends FunSuite {
   }
 
   test("double serialization") {
-    for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue)) {
+    for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, 
Double.NaN)) {
       val bytes = py.serializeDouble(x)
       val deser = py.deserializeDouble(bytes)
-      assert(x === deser)
+      // We use `equals` here for comparison because we cannot use `==` for NaN
+      assert(x.equals(deser))
     }
   }
+
+  test("matrix to 2D array") {
+    val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
+    val matrix = Matrices.dense(2, 3, values)
+    val arr = py.to2dArray(matrix)
+    val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
+    assert(arr === expected)
+
+    // Test conversion for empty matrix
+    val empty = Array[Double]()
+    val emptyMatrix = Matrices.dense(0, 0, empty)
+    val empty2D = py.to2dArray(emptyMatrix)
+    assert(empty2D === Array[Array[Double]]())
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 8e3ad6b..c6ca6a7 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -101,7 +101,7 @@ def _serialize_double(d):
     """
     Serialize a double (float or numpy.float64) into a mutually understood 
format.
     """
-    if type(d) == float or type(d) == float64:
+    if type(d) == float or type(d) == float64 or type(d) == int or type(d) == 
long:
         d = float64(d)
         ba = bytearray(8)
         _copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
@@ -176,6 +176,10 @@ def _deserialize_double(ba, offset=0):
     True
     >>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
     True
+    >>> _deserialize_double(_serialize_double(1)) == 1.0
+    True
+    >>> _deserialize_double(_serialize_double(1L)) == 1.0
+    True
     >>> x = sys.float_info.max
     >>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
     True

http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/python/pyspark/mllib/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
new file mode 100644
index 0000000..0a08a56
--- /dev/null
+++ b/python/pyspark/mllib/stat.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python package for statistical functions in MLlib.
+"""
+
+from pyspark.mllib._common import \
+    _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
+    _serialize_double, _serialize_double_vector, \
+    _deserialize_double, _deserialize_double_matrix
+
+class Statistics(object):
+
+    @staticmethod
+    def corr(x, y=None, method=None):
+        """
+        Compute the correlation (matrix) for the input RDD(s) using the
+        specified method.
+        Methods currently supported: I{pearson (default), spearman}.
+
+        If a single RDD of Vectors is passed in, a correlation matrix
+        comparing the columns in the input RDD is returned. Use C{method=}
+        to specify the method to be used for single RDD inout.
+        If two RDDs of floats are passed in, a single float is returned.
+
+        >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
+        >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
+        >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
+        >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
+        True
+        >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
+        True
+        >>> Statistics.corr(x, y, "spearman")
+        0.5
+        >>> from math import isnan
+        >>> isnan(Statistics.corr(x, zeros))
+        True
+        >>> from linalg import Vectors
+        >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), 
Vectors.dense([4, 5, 0, 3]),
+        ...                       Vectors.dense([6, 7, 0,  8]), 
Vectors.dense([9, 0, 0, 1])])
+        >>> Statistics.corr(rdd)
+        array([[ 1.        ,  0.05564149,         nan,  0.40047142],
+               [ 0.05564149,  1.        ,         nan,  0.91359586],
+               [        nan,         nan,  1.        ,         nan],
+               [ 0.40047142,  0.91359586,         nan,  1.        ]])
+        >>> Statistics.corr(rdd, method="spearman")
+        array([[ 1.        ,  0.10540926,         nan,  0.4       ],
+               [ 0.10540926,  1.        ,         nan,  0.9486833 ],
+               [        nan,         nan,  1.        ,         nan],
+               [ 0.4       ,  0.9486833 ,         nan,  1.        ]])
+        >>> try:
+        ...     Statistics.corr(rdd, "spearman")
+        ...     print "Method name as second argument without 'method=' 
shouldn't be allowed."
+        ... except TypeError:
+        ...     pass
+        """
+        sc = x.ctx
+        # Check inputs to determine whether a single value or a matrix is 
needed for output.
+        # Since it's legal for users to use the method name as the second 
argument, we need to
+        # check if y is used to specify the method name instead.
+        if type(y) == str:
+            raise TypeError("Use 'method=' to specify method name.")
+        if not y:
+            try:
+                Xser = _get_unmangled_double_vector_rdd(x)
+            except TypeError:
+                raise TypeError("corr called on a single RDD not consisted of 
Vectors.")
+            resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
+            return _deserialize_double_matrix(resultMat)
+        else:
+            xSer = _get_unmangled_rdd(x, _serialize_double)
+            ySer = _get_unmangled_rdd(y, _serialize_double)
+            result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, 
method)
+            return result
+
+
+def _test():
+    import doctest
+    from pyspark import SparkContext
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs, 
optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+
+if __name__ == "__main__":
+    _test()

Reply via email to