Github user rezazadeh commented on a diff in the pull request:
https://github.com/apache/incubator-spark/pull/564#discussion_r9974545
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.mllib.util._
+
+
+/**
+ * Class used to obtain principal components
+ */
+class PCA {
+ private var k: Int = 1
+
+ /**
+ * Set the number of top-k principle components to return
+ */
+ def setK(k: Int): PCA = {
+ this.k = k
+ this
+ }
+
+ /**
+ * Compute PCA using the current set parameters
+ */
+ def compute(matrix: DenseMatrix): DenseMatrix = {
+ computePCA(matrix, k)
+ }
+
+ /**
+ * Principal Component Analysis.
+ * Computes the top k principal component coefficients for the m-by-n
data matrix X.
+ * Rows of X correspond to observations and columns correspond to
variables.
+ * The coefficient matrix is n-by-k. Each column of coeff contains
coefficients
+ * for one principal component, and the columns are in descending
+ * order of component variance.
+ * This function centers the data and uses the
+ * singular value decomposition (SVD) algorithm.
+ *
+ * All input and output is expected in DenseMatrix format
+ *
+ * @param matrix dense matrix to perform pca on
+ * @param k Recover k principal components
+ * @return An nxk matrix of principal components
+ */
+ def computePCA(matrix: DenseMatrix, k: Int): DenseMatrix = {
+ val m = matrix.m
+ val n = matrix.n
+
+ if (m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a well-formed matrix")
+ }
+
+ // compute column sums and normalize matrix
+ val rawData = matrix.rows.flatMap{
+ x => Array.tabulate(x.data.size)(idx => MatrixEntry(x.i, idx,
x.data(idx)))
+ }
+ val colSums = rawData.map(entry => (entry.j,
entry.mval)).reduceByKey(_ + _)
+ val data = rawData.map(entry => (entry.j, (entry.i,
entry.mval))).join(colSums).map{
+ case (col, ((row, mval), colsum)) =>
+ MatrixEntry(row, col, (mval - colsum / m.toDouble) /
Math.sqrt(n-1)) }
--- End diff --
@mengxr Note that there are two maps done on lines 73 and 76. These won't
create any shuffling of the data. The reduce on line 76 will create a shuffle,
which will be combined, so that each node does not output anything other than
one small vector of column -> sum. Then those few vectors get shuffled.
Building a combiner for Array[Double] here and summing those isn't going to buy
us anything.
Building a Dense SVD warrants another PR, since that will need a new
implementation for SVD, new tests, documentation, and examples. I will do this
in another PR, maybe at the same time as Dense square SVD, depending on what
algorithm we decide.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
[email protected] or file a JIRA ticket with INFRA.
---