[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49493898
  
LGTM. Thanks for implementing correlations! Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1367


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49493613
  
QA results for PR 1367:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49487511
  
QA tests have started for PR 1367. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15137023
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+ 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15136854
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
--

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135411
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
--- End diff --

Spearman's correlation is a lot more prevalent (whereas Pearson correlation 
is much more common), but changing it to SpearmanCorrelation for consistency. 
This was why I wanted to provide fuzzy string matching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135084
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+ 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135115
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
--- End diff --

Use camelCase

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134693
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
--- End diff --

Should it be called `SpearmanCorrelation` instead of `SpearmansCorrelation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134590
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+private[stat] object PearsonCorrelation extends Correlation with Logging {
+
+  /**
+   * Compute the Pearson correlation for two datasets. NaN if either 
vector has 0 variance.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j. 0 covariance results in a 
correlation value of Double.NaN.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix from the covariance matrix.
+   * 0 covariance results in a correlation value of Double.NaN.
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  // TODO remove once covariance numerical issue resolved.
+  cov(i, i) = if (closeToZero(cov(i, i))) 0.0 else math.sqrt(cov(i, i))
+  i +=1
+}
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+var containNaN = false
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val corr = if (sigma == 0.0 || cov(i, i) == 0.0) {
+  containNaN = true
+  Double.NaN
+} else {
+  cov(i, j) / (sigma * cov(i, i))
+}
+cov(i, j) = corr
+cov(j, i) = corr
+i += 1
+  }
+  j += 1
+}
+
+// put 1.0 on the diagonals
+i = 0
+while (i < n) {
+  cov(i, i) = 1.0
+  i +=1
+}
+
+if (containNaN) {
+  logWarning("Pearson correlation matrix contains NaN values.")
+}
+
+Matrices.fromBreeze(cov)
+  }
+
+  private def closeToZero(value: Double, threshhold: Double = 10e-12): 
Boolean = {
--- End diff --

Using `10e-12` is not very common. Did you mean `1e-12` or `1e-11`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134609
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+private[stat] object PearsonCorrelation extends Correlation with Logging {
+
+  /**
+   * Compute the Pearson correlation for two datasets. NaN if either 
vector has 0 variance.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j. 0 covariance results in a 
correlation value of Double.NaN.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix from the covariance matrix.
+   * 0 covariance results in a correlation value of Double.NaN.
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  // TODO remove once covariance numerical issue resolved.
+  cov(i, i) = if (closeToZero(cov(i, i))) 0.0 else math.sqrt(cov(i, i))
+  i +=1
+}
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+var containNaN = false
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val corr = if (sigma == 0.0 || cov(i, i) == 0.0) {
+  containNaN = true
+  Double.NaN
+} else {
+  cov(i, j) / (sigma * cov(i, i))
+}
+cov(i, j) = corr
+cov(j, i) = corr
+i += 1
+  }
+  j += 1
+}
+
+// put 1.0 on the diagonals
+i = 0
+while (i < n) {
+  cov(i, i) = 1.0
+  i +=1
+}
+
+if (containNaN) {
+  logWarning("Pearson correlation matrix contains NaN values.")
+}
+
+Matrices.fromBreeze(cov)
+  }
+
+  private def closeToZero(value: Double, threshhold: Double = 10e-12): 
Boolean = {
+math.abs(value - 0.0) <= threshhold
--- End diff --

`math.abs(value)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134018
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Trait for correlation algorithms.
+ */
+private[stat] trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j. S(i, j) can be NaN if the correlation is 
undefined for column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]. Can be NaN if correlation 
is undefined for the
+   * input vectors.
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions { iter =>
+  iter.map { case (xi, yi) => new DenseVector(Array(xi, yi)) }
+}
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Maintains the default correlation type, pearson
+ */
+private[stat] object Correlations {
+
+  // Note: after new types of correlations are implemented, please update 
this map
+  val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", 
SpearmansCorrelation))
+  val defaultCorrName: String = "pearson"
+  val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
+
+  def corr(x: RDD[Double], y: RDD[Double], method: String = 
defaultCorrName): Double = {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelation(x, y)
+  }
+
+  def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix 
= {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelationMatrix(X)
+  }
+
+  /**
+   * Perform simple string processing to match the input correlation name 
with a known name
+   *
+   * private to mllib for ease of unit testing
+   */
+  private[mllib] def getCorrelationFromName(method: String): Correlation = 
{
--- End diff --

This function could be further simplified since you have an object map. 
Querying on the map directly should be sufficient. Iterating over its entries 
is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133921
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Trait for correlation algorithms.
+ */
+private[stat] trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j. S(i, j) can be NaN if the correlation is 
undefined for column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]. Can be NaN if correlation 
is undefined for the
+   * input vectors.
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions { iter =>
--- End diff --

`map` is more concise here than `mapPartitions`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133872
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @param method String specifying the method to use for computing 
correlation
+   * @return Correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector], method: String): Matrix = 
Correlations.corrMatrix(X, method)
+
+  /**
+   * Compute the Pearson correlation for the input RDDs.
+   * Columns with 0 covariance produce NaN entries in the correlation 
matrix.
+   *
+   * @param x RDD[Double] of the same cardinality as y
+   * @param y RDD[Double] of the same cardinality as x
+   * @return A Double containing the Pearson correlation between the two 
input RDD[Double]s
+   */
+  def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, 
y)
+
+  /**
+   * Compute the correlation for the input RDDs using the specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * @param x RDD[Double] of the same cardinality as y
+   * @param y RDD[Double] of the same cardinality as x
+   * @param method String specifying the method to use for computing 
correlation
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133828
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
--- End diff --

Missing `.` at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133829
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @param method String specifying the method to use for computing 
correlation
--- End diff --

Besides mentioning in the method doc, could you also list the supported 
method names and the default values in this line? It may be easier for users to 
find what the options are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49477683
  
QA results for PR 1367:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16830/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49467775
  
QA tests have started for PR 1367. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16830/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49405645
  
QA results for PR 1367:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16815/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15099190
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
--- End diff --

Yes, please remove it and leave a note.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is en

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15099185
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
+if (item._1._1  == lastVal && item._2 != Long.MinValue) {
+  rankSum += rank
+  IDBuffer += item._1._2
+  Iterator.empty
+} else {
+  val entries = if (IDBuffer.size == 0) {
+Iterator

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098523
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
+if (item._1._1  == lastVal && item._2 != Long.MinValue) {
+  rankSum += rank
+  IDBuffer += item._1._2
+  Iterator.empty
+} else {
+  val entries = if (IDBuffer.size == 0) {
+Iterator.e

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
--- End diff --

Use `map` instead of `mapPartitions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098446
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
--- End diff --

Added for the sake of consistency with the definition (also in case someone 
wants to pull the ranks function out to be used somewhere else). But I can 
remove it with a note in the docs saying this isn't verbatim with the def in 
the wiki link.

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098421
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
+if (item._1._1  == lastVal && item._2 != Long.MinValue) {
+  rankSum += rank
+  IDBuffer += item._1._2
+  Iterator.empty
+} else {
+  val entries = if (IDBuffer.size == 0) {
+Iterator

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098382
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
--- End diff --

It doesn't matter because it doesn't affect the variance and the 
correlation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15098384
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+// TODO add warning for too many columns
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.mapPartitions[(Double, Long)] { iter =>
+iter.map { case (vector, index) => (vector(k), index) }
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var rankSum = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+val rank = item._2 + 1 // zipWithIndex is 0-indexed but ranks are 
1-indexed
+if (item._1._1  == lastVal && item._2 != Long.MinValue) {
+  rankSum += rank
--- End diff --

Storing the first rank is sufficient. The average rank can be derived from 
first rank and `IDBuffer.size`:

~~~
firstRank + IDBu

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49399247
  
QA tests have started for PR 1367. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16815/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-17 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15087547
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * "spearmansrho" will be matched to "spearman")
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
+
+  // Note: after new types of correlations are implemented, please update 
this map
+  val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", 
SpearmansCorrelation))
+  val defaultCorrName: String = "pearson"
+  val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
+
+  def corr(x: RDD[Double], y: RDD[Double], method: String = 
defaultCorrName): Double = {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelation(x, y)
+  }
+
+  def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix 
= {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelationMatrix(X)
+  }
+
+  /**
+   * Perform simple string processing to match the input correlation name 
with a known name
+   *
+   * private to mllib for ease of unit testing
+   */
+  private[mllib] def getCorrelationFromName(method: String): Correlation = 
{
+if (method.equals(defaultCorrName)) {
+  defaultCorr
+} else {
+  var correlation: Correlation = defaultCorr
+  var matched = false
+  val initialsAllowed = areInitialsUnique()
+  val inputLower = method.toLowerCase()
+  nameToObjectMap.foreach { case (name, corr) =>
+if (!matched) {
+  if ((initialsAllowed && method.size == 1 && 
name.startsWith(inputLower)) ||
+inputLower.contains(name)) { // match names like "spearmans"
--- End diff --

Style issue: Line wrap & spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15040646
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
--- End diff --

The output is sent to `Pearson.computeCorrelationMatrix`, which uses 
covariance to derive the correlation matrix. No `groupBy` or `join` is used. 
The partitioner has no function here and it shouldn't be carried over because 
the output is `RDD[Vector]` but the partitioner applies to `inde

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15040598
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
--- End diff --

sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15040517
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

Sorry, I forgot that I was on branch-0.9 today ... Even `zip` carries the 
partitioner, I don't understand why the partitioner should be preserved in our 
case. The output is `RDD[Vector]` but the partitioner only applies to its first 
element. This is error-prone. The final join is on unique row ids, irreverent 
to the partitioner in `x`. Please check this partitioner is correctly used 
somewhere or we should remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15036918
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

RangePartitioner should be fine since getPartition works on distinct keys 
(so like you said, duplicate values are assigned to the same partition). As 
with many other things, we could just add a warning in the docs saying that if 
there's too many duplicates, use with caution.
On a related note, do you know how to detect if the item is the last when 
you're using flatMap on an iterator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15036690
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

Right, we also need to maintain the list of original row ids. If a column 
only has one distinct value, groupBy may not work well either. We use Long ids. 
With a 100MB buffer, we can handle 12M entries. I hope it is already large 
enough for practical use. The groupBy operator may trigger a global shuffle, 
which is slow. Please check whether we can use the `RangePartitioner` obtained 
from `sortByKey` in groupBy so it doesn't trigger a global shuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15036448
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

Did that before my last comment: 
Here's zip:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
zipPartitions(other, true) { (thisIter, otherIter) =>
  new Iterator[(T, U)] {
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
  case (true, true) => true
  case (false, false) => false
  case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
def next = (thisIter.next, otherIter.next)
  }
}
  }

zipPartitions here returns new ZippedPartitionsRDD2(sc, sc.clean(f), this, 
rdd2, preservesPartitioning), which extends ZippedPartitionsBaseRDD, which has 
the following:
override val partitioner =
if (preservesPartitioning) firstParent[Any].partitioner else None

So yes, the partitioner is propagated from the parent via zip.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15036312
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

Could you double check and confirm? `x.zip(y)` should return a `ZippedRDD`, 
which doesn't implement `partitioner` and hence it falls back to the default 
one: `None`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15033708
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

Hmm so one problem with the buffering approach for duplicates is that the 
uniqueIds for all the duplicate entries need to be saved in a list (since 
they're not necessarily sequential, so we can't do some kind of range thing). 
In the degenerate case where everything is a duplicate in a big RDD, this can 
be problematic (unless we want to get really fancy about saving ranges of IDs).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15033178
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

The implementation of zip preserves the partitioner from the parent RDD. So 
it seems like for consistency sake the partitioner should be further preserved


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15033041
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

`x.zip(y)` doesn't have a partitioner. `zip` preserves number of partitions 
and their indices but not the partitioner. That is why the argument name should 
be really called `preservePartitioner`. There is no reason to put 
`preservePartitioning` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15030155
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
--- End diff --

pretty sure we want to preserve the partitioner here (o/w it's gets wiped 
in the map operation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the featu

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15028681
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
--- End diff --

number of partitions in input RDD is better given the two choices since 
defaultParallelism can be unset (in which case we need a copypasta of the 
second half of Partitioner.defaultPartitioner)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15022756
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

Thanks for the example explaining what `preservePartitioning` does. It 
seems like in this case it's a good idea to copy the partitioner into the pair 
RDD since zip preserves the partitioner, and if we don't preserve the 
partitioner here, we're unnecessarily dropping the partitioner even though we 
didn't need to shuffle the data at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15020475
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
--- End diff --

Please check my comment above about `preservePartitioning` before adding 
`preservePartitioning = true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15020400
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
--- End diff --

you're right. sorry i was thinking about the 2nd zipWithIndex call (on the 
sorted RDD). That one absolutely cannot be replaced with zipWithUnqueId or the 
results are all wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15020402
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
--- End diff --

The code is weird because `ranks` do share any partitioner. It is cleaner 
to use HashPartitioner with the same number of partitions in the input RDD or 
simply `defaultParallelisim`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not w

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15020198
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

The `RangePartitioner` guarantees  same keys appear in the same partition. 
Then inside each partition, you go through an iterator of (value: Double, rank: 
Long). Do not output a value and its rank directly. Count records with the same 
value until a different value comes up, then flush out the value and the 
average rank.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019889
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
--- End diff --

See my previous comment about mentioning the cost in the doc. User should 
be responsible to cache the RDD. If user already cached the input RDD, we are 
duplicating the data here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019642
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
--- End diff --

The input `X` is not ordered. I think the purpose here is just to assign 
each row an id for joining back the elements after we obtain the rank for each 
column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019612
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

Make sure you use `preservePartitioning` correctly. My first understanding 
was wrong and I may have used it wrongly somewhere. The argument should really 
be called `preservePartitioner`. It passes the partitioner to the derived RDD 
without checking. This may lead to wrong results. For example, the join in the 
following code block returns nothing, though `c` and `d` have the same content.

~~~
val a = sc.makeRDD(Seq((0, 1), (2, 3), (4, 5), (6, 7)), 4)
val b = a.groupByKey()
val c = b.mapPartitions({ iter =>
iter.flatMap { case (k, vv) =>
  vv.map { v => (v, k) }
  }}, true)
c.collect()
val d = sc.makeRDD(Seq((1, 0), (3, 2), (5, 4), (7, 6)), 4)
c.join(d).collect()
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019620
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * "spearmansrho" will be matched to "spearman")
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019630
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
+
+// TODO: use blas.dspr instead to compute the correlation matrix
+// if the covariance matrix comes in the upper triangular form for free
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val covariance = cov(i, j) / (sigma * cov(i, i))
--- End diff --

Agree with @srowen : set NaN and generate a warning message. Btw, R returns 
a warning message instead of an error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15019606
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
--- End diff --

It is okay to keep it as long as the `Correlation` class is private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15017975
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
--- End diff --

FWIW I tried it with zipWithUniqueId, and, as expected, the results were 
wrong when more than 1 partition is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-16 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15017783
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
--- End diff --

FWIW I tried it with zipWithUniqueId, and, as expected, the results were 
wrong when more than 1 partition is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-15 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14964217
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

No, plain string matching should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-48953501
  
@mengxr  Thanks for the feedback. Can you respond to my followup questions 
before I update my PR? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14900429
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
--- End diff --

Missed the `preservesPartitioning = true`, which made the parenthesis 
necessary. Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature i

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14900326
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
--- End diff --

That reminds me, changed `val column` to be created using `mapPartitions` 
instead in order to preserve original partitioning in order to avoid shuffling, 
so they should have partitioner inherited from `indexed`. In either case, I 
think it's actually cleaner to use the Partitioner.defaultPartitioner since 
it's exactly intended to create a partitions given a bunch of RDDs. (It does 
one more check i

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14899116
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

"Breaking ties" is a loose description of what happens. I actually need all 
items of the same value in the same partition in order to take the average of 
their positions in the sorted list. I'm open to suggestions on how to make it 
work with mapPartitions though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14899055
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
--- End diff --

I need the exact index of each entry in order to compute accurate ranks in 
the case of duplicate values. Doesn't seem like zipWithUniqueId allows me to do 
that esp when duplicate values fall into different partitions. Suggestions on 
making it work with zipWithUniqueId?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14898523
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
+
+// TODO: use blas.dspr instead to compute the correlation matrix
+// if the covariance matrix comes in the upper triangular form for free
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val covariance = cov(i, j) / (sigma * cov(i, i))
--- End diff --

I think the most honest result is NaN. R will return an error for example. 
You will get that already as the result of 0.0 / 0.0 in the JVM. It's worth 
documenting!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14897906
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
--- End diff --

you would still branch the lineage into numCols RDDs, which then get joined 
back into a single RDD. Does the joining back prevent recalculation of the 
common lineage prefix when it's branched?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14897552
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
+
+// TODO: use blas.dspr instead to compute the correlation matrix
+// if the covariance matrix comes in the upper triangular form for free
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val covariance = cov(i, j) / (sigma * cov(i, i))
--- End diff --

What do you want returned when cov(i, i) is zero? Double.NaN? 0.0? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14896912
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

Does that mean no toLowerCase either?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-14 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14896742
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

No can do here since I have a second argument (preservePartitioning)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-48827327
  
@dorx Thanks for implementing correlations! I just made one pass. Besides 
the method name matching, please make sure only `Statistics.corr`s are public 
and the doc there are accurate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855922
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.stat.correlation.{Correlations, 
PearsonCorrelation,
+SpearmansCorrelation}
+import org.apache.spark.mllib.util.LocalSparkContext
+
+class CorrelationSuite extends FunSuite with LocalSparkContext {
+
+  // test input data
+  val xData = Array(1.0, 0.0, -2.0)
+  val yData = Array(4.0, 5.0, 3.0)
+  val data = Seq(
--- End diff --

Add a column with all `0`s or `1`s for 0 covariance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855912
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
+  iter.map {case (index, values:Seq[Seq[Double]]) => new 
DenseVector(values.flatten.toArray)}
--- End diff --

Add after `{`. If it hits the line width limit, move 
`DenseVector(values.flatten.toArray)` to the next line.


---
If your project is set up for it, you can reply to this email and have 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855908
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
+val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
+cogrouped.mapPartitions({ iter =>
--- End diff --

replace `(` by a space 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastr

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855906
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
+val ranks = groupedByValue.flatMap[(Long, Double)] { item =>
+  val duplicates = item._2
+  if (duplicates.size > 1) {
+val averageRank = duplicates.foldLeft(0L) {_ + _._2 + 1} / 
duplicates.size.toDouble
+duplicates.map(entry => (entry._1._2, averageRank)).toSeq
+  } else {
+duplicates.map(entry => (entry._1._2, entry._2.toDouble + 1)).toSeq
+  }
+}
+ranks.sortByKey()
+  }
+
+  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]]): 
RDD[Vector] = {
+val partitioner = Partitioner.defaultPartitioner(ranks(0), ranks.tail: 
_*)
--- End diff --

`ranks(i)` doesn't have a partitioner. So this should fall back to a 
default `HashPartitioner` with defaultParallelism. If that is the case, please 
use `HashPartitioner` directly to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855891
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [3.5, 2.0, 1.0, 3.5]
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+val groupedByValue = sorted.groupBy(_._1._1)
--- End diff --

`sortByKey` is done through a range partitioner, which means records having 
the same key will be inside the same partition (please check the code and 
confirm). So `mapPartitions` should be sufficient to break ties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855883
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
+(vector(k), index)}
+  }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
--- End diff --

Add doc explaining the return type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855879
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map {case(vector, index) => {
--- End diff --

The code fits a single line: `val column = indexed.map { case (vector, 
index) => (vector(k), index) }`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855867
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
+}
+
+val numCols = X.first.size
--- End diff --

Please add a warning if `numCols` is large, say,  > 100. I'm not sure 
whether this is a good threshold. It would be great if you can try some 
examples and set a reasonable threshold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855861
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
+// Attempt to checkpoint the RDD before splitting it into numCols 
RDD[Double]s to avoid
+// computing the lineage prefix multiple times.
+// If checkpoint directory not set, cache the RDD instead.
+try {
+  indexed.checkpoint()
+} catch {
+  case e: Exception => indexed.cache()
--- End diff --

If `zipWithUnqueId` is used, this block becomes unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855852
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.Partitioner
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+object SpearmansCorrelation extends Correlation {
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithIndex()
--- End diff --

Use `zipWithUniqueId`, which doesn't trigger a job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855845
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
+
+// TODO: use blas.dspr instead to compute the correlation matrix
--- End diff --

dspr is for rank-1 symmetric update, which is irrelevant to the operation 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855846
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
+
+// TODO: use blas.dspr instead to compute the correlation matrix
+// if the covariance matrix comes in the upper triangular form for free
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val covariance = cov(i, j) / (sigma * cov(i, i))
--- End diff --

`covariance` -> `corr`

Note that `cov(i, i)` could be zero.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855844
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
+
+  /**
+   * Compute the Pearson correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the pearson correlation matrix from the covariance matrix
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  cov(i, i) = math.sqrt(cov(i, i))
+  i +=1
+}
+// or we could put the stddev in its own array to trade space for one 
less pass over the matrix
--- End diff --

`cov` is local and has random access. Updating its diagonal is cheap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855841
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

It is not just for low maintenance cost. A strict contract also helps users 
understand exactly what to input and what to expect. For the fault tolerance, 
there are many ways to break Spark by putting an extra "s" somewhere. Let's not 
worry too much in this PR. If we find a mismatch at runtime, throw an error and 
list available methods.

Btw, I like git's solution. You cannot use "git commits" for "git commit" 
but git will suggest:
~~~
Did you mean this?
commit
~~~
Anyway, let's not spend time on this auto-suggestion feature either. Please 
update the PR with strict string matching. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14855843
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+object PearsonCorrelation extends Correlation {
--- End diff --

private[mllib] or [stat]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14846509
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

While I agree we definitely should anticipate cases for generalization, I 
feel ambivalent about generalizing based on fairly hypothetical use cases 
(especially since there hasn't been issues of name collision in well known 
correlation algorithms for many years in almost all major stats packages).
In any case, maybe fuzzy string matching isn't the optimal solution here, 
but the consideration was around user friendliness and the fact that the method 
name isn't something that gets checked at compile time (unless we find a 
mechanism to do so). Since Spark attracts users with its fault tolerance and 
user friendliness, it seems silly to me to have something fail at runtime 
(potentially after a lot of other data processing computation) because of an 
extra "s" in  "spearmans"(and both "spearman" and "spearmans" seem popular 
options). The Correlation interface idea sounds interesting, and I can see 
people who need things to be fault tolerant going the extra mile of 
implementing it (but of course by default we don't require it and use exact 
string matching instead). 
On the other hand, I understand that developers can only do so much to 
tolerate fault in user behaviors, and maintainability seems much more important 
in comparison. 
At any rate, if it comes down to exact string matching or no deal, I'll 
gladly go with exact string matching only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14842983
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

FWIW I tend to strongly agree that the algorithm should be specified by 
exactly one complete string. R isn't necessarily something to emulate. Even 
using Strings feels a bit old-school. I understand possible cross-platform 
concerns but what about passing in implementations of a Correlation interface, 
which are available as singleton instances for convenience from somewhere? Food 
for thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14842782
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

R only supports 3 correlations and they happen to not have name collisions, 
which is fine because there was no plan to support more correlations in R on a 
single machine. But if there comes a popular correlation called "kuma", the way 
R can handle it is renaming it to some name like "zuma" or explaining to user 
why "ku" is required for "kuma" but "k" is for "kendall". That was a bad 
decision R's developers made.

The implementation in this PR suggests that developers can add their own 
implementation but the name resolution isn't really expandable. Besides 
kendall's tau, we might add approximate algorithms in the future, which could 
have name like "approx_spearman" and "approx_pearson". Then we are facing 
collisions in the first character or non-unique substring matching.

Also, we want to make the behavior consistent in MLlib. It adds burden to 
future development. The best solution to me is exact string matching, simple 
implementation and low maintenance cost.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836922
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
--- End diff --

I made it its own method on purpose to force future developers to think 
about whether this a good option or if custom code for vectors can be more 
performant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836715
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * "spearmansrho" will be matched to "spearman")
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
--- End diff --

The reason I named it Correlations isn't to copy Vectors but to signify the 
fact that this is more of a delegator object that maintains all supported 
correlations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836617
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

So the fact R has been supporting this feature for many years  (and still 
haven't deprecated it) is good precedence that among the well known 
correlations, there aren't really any collisions. Since we require that only 
well known and widely adopted algorithms be added to mllib, the likelihood of 
collision is even smaller. (If someone's adding something in their own version 
of spark, presumably they have already looked closely at the docs and been 
warned).
Because we decided to go with strings for method specification instead of 
enums for cross language uniformity, the method is only checked (and possibly 
invalidated) at run time. Therefore, we do want to provide mechanisms for fault 
tolerance and minimization in method specification. To address a related 
comment, "spearman" and "spearmans" are both really common in referring to 
Spearman's correlation (imagine the heartbreak of having your program fail 
because of an extra "s" after hours of data manipulation computation!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808860
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * "spearmansrho" will be matched to "spearman")
--- End diff --

This feature is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808804
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

This feature actually makes it error-prone to add new correlations. For 
example, I use `corr(x, y, "p")` in my code and in v1.2 someone adds another 
correlation starting with "p". Then it breaks existing code. I saw R implements 
this feature but it simply limits the methods to pearson, spearman, and 
kendall. If they plan to add new correlation with prefix collision, they will 
face the same problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808714
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. "P" for "pearson", as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * "spearmansrho" will be matched to "spearman")
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
--- End diff --

Change to `Correlation` and mark it `private[stat]`. We used `Vectors` 
because `scala.Vector` is imported by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808705
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
--- End diff --

`({` -> ` {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808703
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
--- End diff --

We could make this the default implementation of `computeCorrelation(x, y)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808707
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =>
+  iter.map {case(xi, yi) => new DenseVector(Array(xi, yi))}
--- End diff --

spaces around `case` and add one space before `}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808688
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
--- End diff --

`private[stat]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808671
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
--- End diff --

Need doc for public classes. Please mark it as @Experimental for the 
initial version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808679
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
+   */
+  def corr(X: RDD[Vector], method: String): Matrix = 
Correlations.corrMatrix(X, method)
+
+  /**
+   * Compute the Pearson correlation for the input RDDs.
+   */
+  def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, 
y)
+
+  /**
+   * Compute the correlation for the input RDDs using the specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
+   */
+  def corr(x: RDD[Double], y: RDD[Double], method: String): Double = 
Correlations.corr(x, y, method)
+
--- End diff --

remove empty lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808684
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
--- End diff --

Trait for correlation algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808675
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
--- End diff --

Follow ScalaDoc for methods. It is worth noting the cost for spearman.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808250
  
--- Diff: python/pyspark/join.py ---
@@ -1,35 +1,19 @@
-"""
-Copyright (c) 2011, Douban Inc. 
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
-* Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
-
-* Neither the name of the Douban Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""
--- End diff --

This was discussed in https://github.com/apache/spark/pull/592 , and the 
conclusion was to keep the Douban BSD license. Also, this is irrelevant to this 
PR. So please remove this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >