Repository: spark Updated Branches: refs/heads/master fa0524fd0 -> 9689b663a
http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala deleted file mode 100644 index 20e2b0f..0000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import scala.util.Random - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite - -import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import org.apache.spark.mllib.util._ - -import org.jblas._ - -class SVDSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - - val EPSILON = 1e-4 - - // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { - val data = matrix.data - val m = matrix.m - val n = matrix.n - val ret = DoubleMatrix.zeros(m, n) - matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval)) - ret - } - - def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) { - assert(a.rows == b.rows && a.columns == b.columns, - "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns") - for (i <- 0 until a.columns) { - val aCol = a.getColumn(i) - val bCol = b.getColumn(i) - val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1) - assert(diff < EPSILON, "matrix mismatch: " + diff) - } - } - - test("full rank matrix svd") { - val m = 10 - val n = 3 - val datarr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten - val data = sc.makeRDD(datarr, 3) - - val a = SparseMatrix(data, m, n) - - val decomposed = new SVD().setK(n).compute(a) - val u = decomposed.U - val v = decomposed.V - val s = decomposed.S - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixApproximatelyEquals(retv, svd(2)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("dense full rank matrix svd") { - val m = 10 - val n = 3 - val datarr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten - val data = sc.makeRDD(datarr, 3) - - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val decomposed = new SVD().setK(n).setComputeU(true).compute(a) - val u = LAUtils.denseToSparse(decomposed.U) - val v = decomposed.V - val s = decomposed.S - - val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = DoubleMatrix.diag(new DoubleMatrix(s)) - val retv = new DoubleMatrix(v) - - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixApproximatelyEquals(retv, svd(2)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("rank one matrix svd") { - val m = 10 - val n = 3 - val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) => - MatrixEntry(a, b, 1.0) }.flatten ) - val k = 1 - - val a = SparseMatrix(data, m, n) - - val decomposed = new SVD().setK(k).compute(a) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V - val retrank = s.data.collect().length - - assert(retrank == 1, "rank returned not one") - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("truncated with k") { - val m = 10 - val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) - val a = SparseMatrix(data, m, n) - - val k = 1 // only one svalue above this - - val decomposed = new SVD().setK(k).compute(a) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V - val retrank = s.data.collect().length - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - assert(retrank == 1, "rank returned not one") - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala new file mode 100644 index 0000000..cd45438 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.Vectors + +class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 5 + val n = 4 + var mat: CoordinateMatrix = _ + + override def beforeAll() { + super.beforeAll() + val entries = sc.parallelize(Seq( + (0, 0, 1.0), + (0, 1, 2.0), + (1, 1, 3.0), + (1, 2, 4.0), + (2, 2, 5.0), + (2, 3, 6.0), + (3, 0, 7.0), + (3, 3, 8.0), + (4, 1, 9.0)), 3).map { case (i, j, value) => + MatrixEntry(i, j, value) + } + mat = new CoordinateMatrix(entries) + } + + test("size") { + assert(mat.numRows() === m) + assert(mat.numCols() === n) + } + + test("empty entries") { + val entries = sc.parallelize(Seq[MatrixEntry](), 1) + val emptyMat = new CoordinateMatrix(entries) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + + test("toBreeze") { + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(mat.toBreeze() === expected) + } + + test("toIndexedRowMatrix") { + val indexedRowMatrix = mat.toIndexedRowMatrix() + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(indexedRowMatrix.toBreeze() === expected) + } + + test("toRowMatrix") { + val rowMatrix = mat.toRowMatrix() + val rows = rowMatrix.rows.collect().toSet + val expected = Set( + Vectors.dense(1.0, 2.0, 0.0, 0.0), + Vectors.dense(0.0, 3.0, 4.0, 0.0), + Vectors.dense(0.0, 0.0, 5.0, 6.0), + Vectors.dense(7.0, 0.0, 0.0, 8.0), + Vectors.dense(0.0, 9.0, 0.0, 0.0)) + assert(rows === expected) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala new file mode 100644 index 0000000..f7c46f2 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vectors} + +class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 4 + val n = 3 + val data = Seq( + (0L, Vectors.dense(0.0, 1.0, 2.0)), + (1L, Vectors.dense(3.0, 4.0, 5.0)), + (3L, Vectors.dense(9.0, 0.0, 1.0)) + ).map(x => IndexedRow(x._1, x._2)) + var indexedRows: RDD[IndexedRow] = _ + + override def beforeAll() { + super.beforeAll() + indexedRows = sc.parallelize(data, 2) + } + + test("size") { + val mat1 = new IndexedRowMatrix(indexedRows) + assert(mat1.numRows() === m) + assert(mat1.numCols() === n) + + val mat2 = new IndexedRowMatrix(indexedRows, 5, 0) + assert(mat2.numRows() === 5) + assert(mat2.numCols() === n) + } + + test("empty rows") { + val rows = sc.parallelize(Seq[IndexedRow](), 1) + val mat = new IndexedRowMatrix(rows) + intercept[RuntimeException] { + mat.numRows() + } + intercept[RuntimeException] { + mat.numCols() + } + } + + test("toBreeze") { + val mat = new IndexedRowMatrix(indexedRows) + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (0.0, 0.0, 0.0), + (9.0, 0.0, 1.0)) + assert(mat.toBreeze() === expected) + } + + test("toRowMatrix") { + val idxRowMat = new IndexedRowMatrix(indexedRows) + val rowMat = idxRowMat.toRowMatrix() + assert(rowMat.numCols() === n) + assert(rowMat.numRows() === 3, "should drop empty rows") + assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq) + } + + test("multiply a local matrix") { + val A = new IndexedRowMatrix(indexedRows) + val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val C = A.multiply(B) + val localA = A.toBreeze() + val localC = C.toBreeze() + val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]] + assert(localC === expected) + } + + test("gram") { + val A = new IndexedRowMatrix(indexedRows) + val G = A.computeGramianMatrix() + val expected = BDM( + (90.0, 12.0, 24.0), + (12.0, 17.0, 22.0), + (24.0, 22.0, 30.0)) + assert(G.toBreeze === expected) + } + + test("svd") { + val A = new IndexedRowMatrix(indexedRows) + val svd = A.computeSVD(n, computeU = true) + assert(svd.U.isInstanceOf[IndexedRowMatrix]) + val localA = A.toBreeze() + val U = svd.U.toBreeze() + val s = svd.s.toBreeze.asInstanceOf[BDV[Double]] + val V = svd.V.toBreeze.asInstanceOf[BDM[Double]] + assert(closeToZero(U.t * U - BDM.eye[Double](n))) + assert(closeToZero(V.t * V - BDM.eye[Double](n))) + assert(closeToZero(U * brzDiag(s) * V.t - localA)) + } + + def closeToZero(G: BDM[Double]): Boolean = { + G.valuesIterator.map(math.abs).sum < 1e-6 + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/9689b663/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala new file mode 100644 index 0000000..71ee8e8 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector} + +class RowMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 4 + val n = 3 + val arr = Array(0.0, 3.0, 6.0, 9.0, 1.0, 4.0, 7.0, 0.0, 2.0, 5.0, 8.0, 1.0) + val denseData = Seq( + Vectors.dense(0.0, 1.0, 2.0), + Vectors.dense(3.0, 4.0, 5.0), + Vectors.dense(6.0, 7.0, 8.0), + Vectors.dense(9.0, 0.0, 1.0) + ) + val sparseData = Seq( + Vectors.sparse(3, Seq((1, 1.0), (2, 2.0))), + Vectors.sparse(3, Seq((0, 3.0), (1, 4.0), (2, 5.0))), + Vectors.sparse(3, Seq((0, 6.0), (1, 7.0), (2, 8.0))), + Vectors.sparse(3, Seq((0, 9.0), (2, 1.0))) + ) + + val principalComponents = BDM( + (0.0, 1.0, 0.0), + (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0), + (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0)) + + var denseMat: RowMatrix = _ + var sparseMat: RowMatrix = _ + + override def beforeAll() { + super.beforeAll() + denseMat = new RowMatrix(sc.parallelize(denseData, 2)) + sparseMat = new RowMatrix(sc.parallelize(sparseData, 2)) + } + + test("size") { + assert(denseMat.numRows() === m) + assert(denseMat.numCols() === n) + assert(sparseMat.numRows() === m) + assert(sparseMat.numCols() === n) + } + + test("empty rows") { + val rows = sc.parallelize(Seq[Vector](), 1) + val emptyMat = new RowMatrix(rows) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + + test("toBreeze") { + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (6.0, 7.0, 8.0), + (9.0, 0.0, 1.0)) + for (mat <- Seq(denseMat, sparseMat)) { + assert(mat.toBreeze() === expected) + } + } + + test("gram") { + val expected = + Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)) + for (mat <- Seq(denseMat, sparseMat)) { + val G = mat.computeGramianMatrix() + assert(G.toBreeze === expected.toBreeze) + } + } + + test("svd of a full-rank matrix") { + for (mat <- Seq(denseMat, sparseMat)) { + val localMat = mat.toBreeze() + val (localU, localSigma, localVt) = brzSvd(localMat) + val localV: BDM[Double] = localVt.t.toDenseMatrix + for (k <- 1 to n) { + val svd = mat.computeSVD(k, computeU = true) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + } + val svdWithoutU = mat.computeSVD(n) + assert(svdWithoutU.U === null) + } + } + + test("svd of a low-rank matrix") { + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 2) + val svd = mat.computeSVD(2, computeU = true) + assert(svd.s.size === 1, "should not return zero singular values") + assert(svd.U.numRows() === 4) + assert(svd.U.numCols() === 1) + assert(svd.V.numRows === 2) + assert(svd.V.numCols === 1) + } + + def closeToZero(G: BDM[Double]): Boolean = { + G.valuesIterator.map(math.abs).sum < 1e-6 + } + + def closeToZero(v: BDV[Double]): Boolean = { + brzNorm(v, 1.0) < 1e-6 + } + + def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) { + assert(A.rows === B.rows) + for (j <- 0 until k) { + val aj = A(::, j) + val bj = B(::, j) + assert(closeToZero(aj - bj) || closeToZero(aj + bj), + s"The $j-th columns mismatch: $aj and $bj") + } + } + + test("pca") { + for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) { + val pc = denseMat.computePrincipalComponents(k) + assert(pc.numRows === n) + assert(pc.numCols === k) + assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k) + } + } + + test("multiply a local matrix") { + val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + for (mat <- Seq(denseMat, sparseMat)) { + val AB = mat.multiply(B) + assert(AB.numRows() === m) + assert(AB.numCols() === 2) + assert(AB.rows.collect().toSeq === Seq( + Vectors.dense(5.0, 14.0), + Vectors.dense(14.0, 50.0), + Vectors.dense(23.0, 86.0), + Vectors.dense(2.0, 32.0) + )) + } + } +}