http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/scalabindings/VectorOps.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/scalabindings/VectorOps.scala b/samsara/src/main/scala/org/apache/mahout/math/scalabindings/VectorOps.scala new file mode 100644 index 0000000..c20354d --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/scalabindings/VectorOps.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.apache.mahout.math._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.function.Functions + +/** + * Syntactic sugar for mahout vectors + * @param v Mahout vector + */ +class VectorOps(private[scalabindings] val v: Vector) { + + import RLikeOps._ + + def apply(i: Int) = v.get(i) + + def update(i: Int, that: Double) = v.setQuick(i, that) + + /** Warning: we only support consecutive views, step is not supported directly */ + def apply(r: Range) = if (r == ::) v else v.viewPart(r.start, r.length * r.step) + + def update(r: Range, that: Vector) = apply(r) := that + + def sum = v.zSum() + + def :=(that: Vector): Vector = { + + // assign op in Mahout requires same + // cardinality between vectors . + // we want to relax it here and require + // v to have _at least_ as large cardinality + // as "that". + if (that.length == v.size()) + v.assign(that) + else if (that.length < v.size) { + v.assign(0.0) + that.nonZeroes().foreach(t => v.setQuick(t.index, t.get)) + v + } else throw new IllegalArgumentException("Assigner's cardinality less than assignee's") + } + + def :=(that: Double): Vector = v.assign(that) + + def :=(f: (Int, Double) => Double): Vector = { + for (i <- 0 until length) v(i) = f(i, v(i)) + v + } + + def equiv(that: Vector) = + length == that.length && + v.all.view.zip(that.all).forall(t => t._1.get == t._2.get) + + def ===(that: Vector) = equiv(that) + + def !==(that: Vector) = nequiv(that) + + def nequiv(that: Vector) = !equiv(that) + + def unary_- = cloned.assign(Functions.NEGATE) + + def +=(that: Vector) = v.assign(that, Functions.PLUS) + + def -=(that: Vector) = v.assign(that, Functions.MINUS) + + def +=(that: Double) = v.assign(Functions.PLUS, that) + + def -=(that: Double) = +=(-that) + + def -=:(that: Vector) = v.assign(Functions.NEGATE).assign(that, Functions.PLUS) + + def -=:(that: Double) = v.assign(Functions.NEGATE).assign(Functions.PLUS, that) + + def +(that: Vector) = cloned += that + + def -(that: Vector) = cloned -= that + + def -:(that: Vector) = that.cloned -= v + + def +(that: Double) = cloned += that + + def +:(that: Double) = cloned += that + + def -(that: Double) = cloned -= that + + def -:(that: Double) = that -=: v.cloned + + def length = v.size() + + def cloned: Vector = v.like := v + + def sqrt = v.cloned.assign(Functions.SQRT) + + /** Convert to a single column matrix */ + def toColMatrix: Matrix = { + import RLikeOps._ + v match { + + case vd: Vector if (vd.isDense) => dense(vd).t + case srsv: RandomAccessSparseVector => new SparseColumnMatrix(srsv.length, 1, Array(srsv)) + case _ => sparse(v).t + } + } + +} + +class ElementOps(private[scalabindings] val el: Vector.Element) { + + def apply = el.get() + + def update(v: Double) = el.set(v) + + def :=(v: Double) = el.set(v) + + def +(that: Double) = el.get() + that + + def -(that: Double) = el.get() - that + + def :-(that: Double) = that - el.get() + + def /(that: Double) = el.get() / that + + def :/(that: Double) = that / el.get() + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/scalabindings/package.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/samsara/src/main/scala/org/apache/mahout/math/scalabindings/package.scala new file mode 100644 index 0000000..36f5103 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/scalabindings/package.scala @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math + +import org.apache.mahout.math.solver.EigenDecomposition + +/** + * Mahout matrices and vectors' scala syntactic sugar + */ +package object scalabindings { + + // Reserved "ALL" range + final val `::`: Range = null + + implicit def seq2Vector(s: TraversableOnce[AnyVal]) = + new DenseVector(s.map(_.asInstanceOf[Number].doubleValue()).toArray) + + implicit def tuple2TravOnce2svec[V <: AnyVal](sdata: TraversableOnce[(Int, V)]) = svec(sdata) + + implicit def t1vec(s: Tuple1[AnyVal]): Vector = prod2Vec(s) + + implicit def t2vec(s: Tuple2[AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t3vec(s: Tuple3[AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t4vec(s: Tuple4[AnyVal, AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t5vec(s: Tuple5[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t6vec(s: Tuple6[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t7vec(s: Tuple7[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t8vec(s: Tuple8[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]): Vector = prod2Vec(s) + + implicit def t9vec(s: Tuple9[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]): Vector = + prod2Vec(s) + + implicit def t10vec(s: Tuple10[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t11vec(s: Tuple11[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal]) + : Vector = prod2Vec(s) + + implicit def t12vec(s: Tuple12[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t13vec(s: Tuple13[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t14vec(s: Tuple14[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t15vec(s: Tuple15[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t16vec(s: Tuple16[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t17vec(s: Tuple17[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t18vec(s: Tuple18[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t19vec(s: Tuple19[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t20vec(s: Tuple20[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t21vec(s: Tuple21[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + implicit def t22vec(s: Tuple22[AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal + , AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal, AnyVal]) + : Vector = prod2Vec(s) + + + def prod2Vec(s: Product) = new DenseVector(s.productIterator. + map(_.asInstanceOf[Number].doubleValue()).toArray) + + def diagv(v: Vector): DiagonalMatrix = new DiagonalMatrix(v) + + def diag(v: Double, size: Int): DiagonalMatrix = + new DiagonalMatrix(new DenseVector(Array.fill(size)(v))) + + def eye(size: Int) = new DiagonalMatrix(1.0, size) + + /** + * Create dense matrix out of inline arguments -- rows -- which can be tuples, + * iterables of Double, or just single Number (for columnar vectors) + * @param rows + * @tparam R + * @return + */ + def dense[R](rows: R*): DenseMatrix = { + import RLikeOps._ + val data = for (r <- rows) yield { + r match { + case n: Number => Array(n.doubleValue()) + case t: Product => t.productIterator.map(_.asInstanceOf[Number].doubleValue()).toArray + case t: Vector => Array.tabulate(t.length)(t(_)) + case t: Array[Double] => t + case t: Iterable[_] => + t.head match { + case ss: Double => t.asInstanceOf[Iterable[Double]].toArray + case vv: Vector => + val m = new DenseMatrix(t.size, t.head.asInstanceOf[Vector].length) + t.asInstanceOf[Iterable[Vector]].view.zipWithIndex.foreach { + case (v, idx) => m(idx, ::) := v + } + return m + } + case t: Array[Array[Double]] => if (rows.size == 1) + return new DenseMatrix(t) + else + throw new IllegalArgumentException( + "double[][] data parameter can be the only argument for dense()") + case t: Array[Vector] => + val m = new DenseMatrix(t.size, t.head.length) + t.view.zipWithIndex.foreach { + case (v, idx) => m(idx, ::) := v + } + return m + case _ => throw new IllegalArgumentException("unsupported type in the inline Matrix initializer") + } + } + new DenseMatrix(data.toArray) + } + + /** + * Default initializes are always row-wise. + * create a sparse, + * e.g. {{{ + * + * m = sparse( + * (0,5)::(9,3)::Nil, + * (2,3.5)::(7,8)::Nil + * ) + * + * }}} + * + * @param rows + * @return + */ + + def sparse(rows: Vector*): SparseRowMatrix = { + import MatrixOps._ + val nrow = rows.size + val ncol = rows.map(_.size()).max + val m = new SparseRowMatrix(nrow, ncol) + m := rows + m + + } + + /** + * create a sparse vector out of list of tuple2's + * @param sdata + * @return + */ + def svec(sdata: TraversableOnce[(Int, AnyVal)]) = { + val cardinality = if (sdata.size > 0) sdata.map(_._1).max + 1 else 0 + val initialCapacity = sdata.size + val sv = new RandomAccessSparseVector(cardinality, initialCapacity) + sdata.foreach(t => sv.setQuick(t._1, t._2.asInstanceOf[Number].doubleValue())) + sv + } + + def dvec(fromV: Vector) = new DenseVector(fromV) + + def dvec(ddata: TraversableOnce[Double]) = new DenseVector(ddata.toArray) + + def dvec(numbers: Number*) = new DenseVector(numbers.map(_.doubleValue()).toArray) + + def chol(m: Matrix, pivoting: Boolean = false) = new CholeskyDecomposition(m, pivoting) + + /** + * computes SVD + * @param m svd input + * @return (U,V, singular-values-vector) + */ + def svd(m: Matrix) = { + val svdObj = new SingularValueDecomposition(m) + (svdObj.getU, svdObj.getV, new DenseVector(svdObj.getSingularValues)) + } + + /** + * Computes Eigendecomposition of a symmetric matrix + * @param m symmetric input matrix + * @return (V, eigen-values-vector) + */ + def eigen(m: Matrix) = { + val ed = new EigenDecomposition(m, true) + (ed.getV, ed.getRealEigenvalues) + } + + + /** + * More general version of eigen decomposition + * @param m + * @param symmetric + * @return (V, eigenvalues-real-vector, eigenvalues-imaginary-vector) + */ + def eigenFull(m: Matrix, symmetric: Boolean = true) { + val ed = new EigenDecomposition(m, symmetric) + (ed.getV, ed.getRealEigenvalues, ed.getImagEigenvalues) + } + + /** + * QR. + * + * Right now Mahout's QR seems to be using argument for in-place transformations, + * so the matrix context gets messed after this. Hence we force cloning of the + * argument before passing it to Mahout's QR so to keep expected semantics. + * @param m + * @return (Q,R) + */ + def qr(m: Matrix) = { + import MatrixOps._ + val qrdec = new QRDecomposition(m cloned) + (qrdec.getQ, qrdec.getR) + } + + /** + * Solution <tt>X</tt> of <tt>A*X = B</tt> using QR-Decomposition, where <tt>A</tt> is a square, non-singular matrix. + * + * @param a + * @param b + * @return (X) + */ + def solve(a: Matrix, b: Matrix): Matrix = { + import MatrixOps._ + if (a.nrow != a.ncol) { + throw new IllegalArgumentException("supplied matrix A is not square") + } + val qr = new QRDecomposition(a cloned) + if (!qr.hasFullRank) { + throw new IllegalArgumentException("supplied matrix A is singular") + } + qr.solve(b) + } + + /** + * Solution <tt>A^{-1}</tt> of <tt>A*A^{-1} = I</tt> using QR-Decomposition, where <tt>A</tt> is a square, + * non-singular matrix. Here only for compatibility with R semantics. + * + * @param a + * @return (A^{-1}) + */ + def solve(a: Matrix): Matrix = { + import MatrixOps._ + solve(a, eye(a.nrow)) + } + + /** + * Solution <tt>x</tt> of <tt>A*x = b</tt> using QR-Decomposition, where <tt>A</tt> is a square, non-singular matrix. + * + * @param a + * @param b + * @return (x) + */ + def solve(a: Matrix, b: Vector): Vector = { + import RLikeOps._ + val x = solve(a, b.toColMatrix) + x(::, 0) + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/nlp/tfidf/TFIDF.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/nlp/tfidf/TFIDF.scala b/samsara/src/main/scala/org/apache/mahout/nlp/tfidf/TFIDF.scala new file mode 100644 index 0000000..c75ff20 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/nlp/tfidf/TFIDF.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.nlp.tfidf + +trait TermWeight { + + /** + * @param tf term freq + * @param df doc freq + * @param length Length of the document + * @param numDocs the total number of docs + */ + def calculate(tf: Int, df: Int, length: Int, numDocs: Int): Double +} + + +class TFIDF extends TermWeight { + + /** + * Calculate TF-IDF weight. + * + * Lucene 4.6's DefaultSimilarity TF-IDF calculation uses the formula: + * + * sqrt(termFreq) * (log(numDocs / (docFreq + 1)) + 1.0) + * + * Note: this is consistent with the MapReduce seq2sparse implementation of TF-IDF weights + * and is slightly different from Spark MLlib's TD-IDF calculation which is implemented as: + * + * termFreq * log((numDocs + 1.0) / (docFreq + 1.0)) + * + * @param tf term freq + * @param df doc freq + * @param length Length of the document - UNUSED + * @param numDocs the total number of docs + * @return The TF-IDF weight as calculated by Lucene 4.6's DefaultSimilarity + */ + def calculate(tf: Int, df: Int, length: Int, numDocs: Int): Double = { + + // Lucene 4.6 DefaultSimilarity's TF-IDF is implemented as: + // sqrt(tf) * (log(numDocs / (df + 1)) + 1) + math.sqrt(tf) * (math.log(numDocs / (df + 1).toDouble) + 1.0) + } +} + +class MLlibTFIDF extends TermWeight { + + /** + * Calculate TF-IDF weight with IDF formula used by Spark MLlib's IDF: + * + * termFreq * log((numDocs + 1.0) / (docFreq + 1.0)) + * + * Use this weight if working with MLLib vectorized documents. + * + * Note: this is not consistent with the MapReduce seq2sparse implementation of TF-IDF weights + * which is implemented using Lucene DefaultSimilarity's TF-IDF calculation: + * + * sqrt(termFreq) * (log(numDocs / (docFreq + 1)) + 1.0) + * + * @param tf term freq + * @param df doc freq + * @param length Length of the document - UNUSED + * @param numDocs the total number of docs + * @return The TF-IDF weight as calculated by Spark MLlib's IDF + */ + def calculate(tf: Int, df: Int, length: Int, numDocs: Int): Double = { + + // Spark MLLib's TF-IDF weight is implemented as: + // termFreq * log((numDocs + 1.0) / (docFreq + 1.0)) + tf * math.log((numDocs + 1.0) / (df + 1).toDouble) + } +} + +class TF extends TermWeight { + + /** + * For TF Weight simply return the absolute TF. + * + * Note: We do not use Lucene 4.6's DefaultSimilarity's TF calculation here + * which returns: + * + * sqrt(tf) + * + * this is consistent with the MapReduce seq2sparse implementation of TF weights. + * + * @param tf term freq + * @param df doc freq - UNUSED + * @param length Length of the document - UNUSED + * @param numDocs the total number of docs - UNUSED + * based on term frequency only - UNUSED + * @return The weight = tf param + */ + def calculate(tf: Int, df: Int = -Int.MaxValue, length: Int = -Int.MaxValue, numDocs: Int = -Int.MaxValue): Double = { + tf + } +} + + http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/classifier/naivebayes/NBTestBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/classifier/naivebayes/NBTestBase.scala b/samsara/src/test/scala/org/apache/mahout/classifier/naivebayes/NBTestBase.scala new file mode 100644 index 0000000..c8f8a90 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/classifier/naivebayes/NBTestBase.scala @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.classifier.naivebayes + +import org.apache.mahout.math._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.test.DistributedMahoutSuite +import org.apache.mahout.test.MahoutSuite +import org.scalatest.{FunSuite, Matchers} +import collection._ +import JavaConversions._ +import collection.JavaConversions + +trait NBTestBase extends DistributedMahoutSuite with Matchers { this:FunSuite => + + val epsilon = 1E-6 + + test("Simple Standard NB Model") { + + // test from simulated sparse TF-IDF data + val inCoreTFIDF = sparse( + (0, 0.7) ::(1, 0.1) ::(2, 0.1) ::(3, 0.3) :: Nil, + (0, 0.4) ::(1, 0.4) ::(2, 0.1) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.0) ::(2, 0.8) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.1) ::(2, 0.1) ::(3, 0.7) :: Nil + ) + + val TFIDFDrm = drm.drmParallelize(m = inCoreTFIDF, numPartitions = 2) + + val labelIndex = new java.util.HashMap[String,Integer]() + labelIndex.put("Cat1", 3) + labelIndex.put("Cat2", 2) + labelIndex.put("Cat3", 1) + labelIndex.put("Cat4", 0) + + // train a Standard NB Model + val model = NaiveBayes.train(TFIDFDrm, labelIndex, false) + + // validate the model- will throw an exception if model is invalid + model.validate() + + // check the labelWeights + model.labelWeight(0) - 1.2 should be < epsilon + model.labelWeight(1) - 1.0 should be < epsilon + model.labelWeight(2) - 1.0 should be < epsilon + model.labelWeight(3) - 1.0 should be < epsilon + + // check the Feature weights + model.featureWeight(0) - 1.3 should be < epsilon + model.featureWeight(1) - 0.6 should be < epsilon + model.featureWeight(2) - 1.1 should be < epsilon + model.featureWeight(3) - 1.2 should be < epsilon + } + + test("NB Aggregator") { + + val rowBindings = new java.util.HashMap[String,Integer]() + rowBindings.put("/Cat1/doc_a/", 0) + rowBindings.put("/Cat2/doc_b/", 1) + rowBindings.put("/Cat1/doc_c/", 2) + rowBindings.put("/Cat2/doc_d/", 3) + rowBindings.put("/Cat1/doc_e/", 4) + + + val matrixSetup = sparse( + (0, 0.1) ::(1, 0.0) ::(2, 0.1) ::(3, 0.0) :: Nil, + (0, 0.0) ::(1, 0.1) ::(2, 0.0) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.0) ::(2, 0.1) ::(3, 0.0) :: Nil, + (0, 0.0) ::(1, 0.1) ::(2, 0.0) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.0) ::(2, 0.1) ::(3, 0.0) :: Nil + ) + + + matrixSetup.setRowLabelBindings(rowBindings) + + val TFIDFDrm = drm.drmParallelizeWithRowLabels(m = matrixSetup, numPartitions = 2) + + val (labelIndex, aggregatedTFIDFDrm) = NaiveBayes.extractLabelsAndAggregateObservations(TFIDFDrm) + + labelIndex.size should be (2) + + val cat1=labelIndex("Cat1") + val cat2=labelIndex("Cat2") + + cat1 should be (0) + cat2 should be (1) + + val aggregatedTFIDFInCore = aggregatedTFIDFDrm.collect + aggregatedTFIDFInCore.numCols should be (4) + aggregatedTFIDFInCore.numRows should be (2) + + aggregatedTFIDFInCore.get(cat1, 0) - 0.3 should be < epsilon + aggregatedTFIDFInCore.get(cat1, 1) - 0.0 should be < epsilon + aggregatedTFIDFInCore.get(cat1, 2) - 0.3 should be < epsilon + aggregatedTFIDFInCore.get(cat1, 3) - 0.0 should be < epsilon + aggregatedTFIDFInCore.get(cat2, 0) - 0.0 should be < epsilon + aggregatedTFIDFInCore.get(cat2, 1) - 0.2 should be < epsilon + aggregatedTFIDFInCore.get(cat2, 2) - 0.0 should be < epsilon + aggregatedTFIDFInCore.get(cat2, 3) - 0.2 should be < epsilon + + } + + test("Model DFS Serialization") { + + // test from simulated sparse TF-IDF data + val inCoreTFIDF = sparse( + (0, 0.7) ::(1, 0.1) ::(2, 0.1) ::(3, 0.3) :: Nil, + (0, 0.4) ::(1, 0.4) ::(2, 0.1) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.0) ::(2, 0.8) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.1) ::(2, 0.1) ::(3, 0.7) :: Nil + ) + + val labelIndex = new java.util.HashMap[String,Integer]() + labelIndex.put("Cat1", 0) + labelIndex.put("Cat2", 1) + labelIndex.put("Cat3", 2) + labelIndex.put("Cat4", 3) + + val TFIDFDrm = drm.drmParallelize(m = inCoreTFIDF, numPartitions = 2) + + // train a Standard NB Model- no label index here + val model = NaiveBayes.train(TFIDFDrm, labelIndex, false) + + // validate the model- will throw an exception if model is invalid + model.validate() + + // save the model + model.dfsWrite(TmpDir) + + // reload a new model which should be equal to the original + // this will automatically trigger a validate() call + val materializedModel= NBModel.dfsRead(TmpDir) + + + // check the labelWeights + model.labelWeight(0) - materializedModel.labelWeight(0) should be < epsilon //1.2 + model.labelWeight(1) - materializedModel.labelWeight(1) should be < epsilon //1.0 + model.labelWeight(2) - materializedModel.labelWeight(2) should be < epsilon //1.0 + model.labelWeight(3) - materializedModel.labelWeight(3) should be < epsilon //1.0 + + // check the Feature weights + model.featureWeight(0) - materializedModel.featureWeight(0) should be < epsilon //1.3 + model.featureWeight(1) - materializedModel.featureWeight(1) should be < epsilon //0.6 + model.featureWeight(2) - materializedModel.featureWeight(2) should be < epsilon //1.1 + model.featureWeight(3) - materializedModel.featureWeight(3) should be < epsilon //1.2 + + // check to se if the new model is complementary + materializedModel.isComplementary should be (model.isComplementary) + + // check the label indexMaps + for(elem <- model.labelIndex){ + model.labelIndex(elem._1) == materializedModel.labelIndex(elem._1) should be (true) + } + } + + test("train and test a model") { + + // test from simulated sparse TF-IDF data + val inCoreTFIDF = sparse( + (0, 0.7) ::(1, 0.1) ::(2, 0.1) ::(3, 0.3) :: Nil, + (0, 0.4) ::(1, 0.4) ::(2, 0.1) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.0) ::(2, 0.8) ::(3, 0.1) :: Nil, + (0, 0.1) ::(1, 0.1) ::(2, 0.1) ::(3, 0.7) :: Nil + ) + + val labelIndex = new java.util.HashMap[String,Integer]() + labelIndex.put("/Cat1/", 0) + labelIndex.put("/Cat2/", 1) + labelIndex.put("/Cat3/", 2) + labelIndex.put("/Cat4/", 3) + + val TFIDFDrm = drm.drmParallelize(m = inCoreTFIDF, numPartitions = 2) + + // train a Standard NB Model- no label index here + val model = NaiveBayes.train(TFIDFDrm, labelIndex, false) + + // validate the model- will throw an exception if model is invalid + model.validate() + + // save the model + model.dfsWrite(TmpDir) + + // reload a new model which should be equal to the original + // this will automatically trigger a validate() call + val materializedModel= NBModel.dfsRead(TmpDir) + + + // check to se if the new model is complementary + materializedModel.isComplementary should be (model.isComplementary) + + // check the label indexMaps + for(elem <- model.labelIndex){ + model.labelIndex(elem._1) == materializedModel.labelIndex(elem._1) should be (true) + } + + + //self test on the original set + val inCoreTFIDFWithLabels = inCoreTFIDF.clone() + inCoreTFIDFWithLabels.setRowLabelBindings(labelIndex) + val TFIDFDrmWithLabels = drm.drmParallelizeWithRowLabels(m = inCoreTFIDFWithLabels, numPartitions = 2) + + NaiveBayes.test(materializedModel,TFIDFDrmWithLabels , false) + + } + + test("train and test a model with the confusion matrix") { + + val rowBindings = new java.util.HashMap[String,Integer]() + rowBindings.put("/Cat1/doc_a/", 0) + rowBindings.put("/Cat2/doc_b/", 1) + rowBindings.put("/Cat1/doc_c/", 2) + rowBindings.put("/Cat2/doc_d/", 3) + rowBindings.put("/Cat1/doc_e/", 4) + rowBindings.put("/Cat2/doc_f/", 5) + rowBindings.put("/Cat1/doc_g/", 6) + rowBindings.put("/Cat2/doc_h/", 7) + rowBindings.put("/Cat1/doc_i/", 8) + rowBindings.put("/Cat2/doc_j/", 9) + + val seed = 1 + + val matrixSetup = Matrices.uniformView(10, 50 , seed) + + println("TFIDF matrix") + println(matrixSetup) + + matrixSetup.setRowLabelBindings(rowBindings) + + val TFIDFDrm = drm.drmParallelizeWithRowLabels(matrixSetup) + + // println("Parallelized and Collected") + // println(TFIDFDrm.collect) + + val (labelIndex, aggregatedTFIDFDrm) = NaiveBayes.extractLabelsAndAggregateObservations(TFIDFDrm) + + println("Aggregated by key") + println(aggregatedTFIDFDrm.collect) + println(labelIndex) + + + // train a Standard NB Model- no label index here + val model = NaiveBayes.train(aggregatedTFIDFDrm, labelIndex, false) + + // validate the model- will throw an exception if model is invalid + model.validate() + + // save the model + model.dfsWrite(TmpDir) + + // reload a new model which should be equal to the original + // this will automatically trigger a validate() call + val materializedModel= NBModel.dfsRead(TmpDir) + + // check to se if the new model is complementary + materializedModel.isComplementary should be (model.isComplementary) + + // check the label indexMaps + for(elem <- model.labelIndex){ + model.labelIndex(elem._1) == materializedModel.labelIndex(elem._1) should be (true) + } + + // val testTFIDFDrm = drm.drmParallelizeWithRowLabels(m = matrixSetup, numPartitions = 2) + + // self test on this model + val result = NaiveBayes.test(materializedModel, TFIDFDrm , false) + + println(result) + + result.getConfusionMatrix.getMatrix.getQuick(0, 0) should be(5) + result.getConfusionMatrix.getMatrix.getQuick(0, 1) should be(0) + result.getConfusionMatrix.getMatrix.getQuick(1, 0) should be(0) + result.getConfusionMatrix.getMatrix.getQuick(1, 1) should be(5) + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/classifier/stats/ClassifierStatsTestBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/classifier/stats/ClassifierStatsTestBase.scala b/samsara/src/test/scala/org/apache/mahout/classifier/stats/ClassifierStatsTestBase.scala new file mode 100644 index 0000000..eafde11 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/classifier/stats/ClassifierStatsTestBase.scala @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.classifier.stats + +import java.lang.Double +import java.util.Random +import java.util.Arrays + +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.Matrix +import org.apache.mahout.test.DistributedMahoutSuite +import org.scalatest.{FunSuite, Matchers} + + + +trait ClassifierStatsTestBase extends DistributedMahoutSuite with Matchers { this: FunSuite => + + val epsilon = 1E-6 + + val smallEpsilon = 1.0 + + // FullRunningAverageAndStdDev tests + test("testFullRunningAverageAndStdDev") { + val average: RunningAverageAndStdDev = new FullRunningAverageAndStdDev + assert(0 == average.getCount) + assert(true == Double.isNaN(average.getAverage)) + assert(true == Double.isNaN(average.getStandardDeviation)) + average.addDatum(6.0) + assert(1 == average.getCount) + assert((6.0 - average.getAverage).abs < epsilon) + assert(true == Double.isNaN(average.getStandardDeviation)) + average.addDatum(6.0) + assert(2 == average.getCount) + assert((6.0 - average.getAverage).abs < epsilon) + assert((0.0 - average.getStandardDeviation).abs < epsilon) + average.removeDatum(6.0) + assert(1 == average.getCount) + assert((6.0 - average.getAverage).abs < epsilon) + assert(true == Double.isNaN(average.getStandardDeviation)) + average.addDatum(-4.0) + assert(2 == average.getCount) + assert((1.0 - average.getAverage).abs < epsilon) + assert(((5.0 * 1.4142135623730951) - average.getStandardDeviation).abs < epsilon) + average.removeDatum(4.0) + assert(1 == average.getCount) + assert((2.0 + average.getAverage).abs < epsilon) + assert(true == Double.isNaN(average.getStandardDeviation)) + } + + test("testBigFullRunningAverageAndStdDev") { + val average: RunningAverageAndStdDev = new FullRunningAverageAndStdDev + RandomUtils.useTestSeed() + val r: Random = RandomUtils.getRandom + + for (i <- 0 until 100000) { + average.addDatum(r.nextDouble() * 1000.0) + } + + assert((500.0 - average.getAverage).abs < smallEpsilon) + assert(((1000.0 / Math.sqrt(12.0)) - average.getStandardDeviation).abs < smallEpsilon) + } + + test("testStddevFullRunningAverageAndStdDev") { + val runningAverage: RunningAverageAndStdDev = new FullRunningAverageAndStdDev + assert(0 == runningAverage.getCount) + assert(true == Double.isNaN(runningAverage.getAverage)) + runningAverage.addDatum(1.0) + assert(1 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + assert(true == Double.isNaN(runningAverage.getStandardDeviation)) + runningAverage.addDatum(1.0) + assert(2 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + assert((0.0 -runningAverage.getStandardDeviation).abs < epsilon) + runningAverage.addDatum(7.0) + assert(3 == runningAverage.getCount) + assert((3.0 - runningAverage.getAverage).abs < epsilon) + assert((3.464101552963257 - runningAverage.getStandardDeviation).abs < epsilon) + runningAverage.addDatum(5.0) + assert(4 == runningAverage.getCount) + assert((3.5 - runningAverage.getAverage) < epsilon) + assert((3.0- runningAverage.getStandardDeviation).abs < epsilon) + } + + + + // FullRunningAverage tests + test("testFullRunningAverage"){ + val runningAverage: RunningAverage = new FullRunningAverage + assert(0 == runningAverage.getCount) + assert(true == Double.isNaN(runningAverage.getAverage)) + runningAverage.addDatum(1.0) + assert(1 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + runningAverage.addDatum(1.0) + assert(2 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + runningAverage.addDatum(4.0) + assert(3 == runningAverage.getCount) + assert((2.0 - runningAverage.getAverage) < epsilon) + runningAverage.addDatum(-4.0) + assert(4 == runningAverage.getCount) + assert((0.5 - runningAverage.getAverage).abs < epsilon) + runningAverage.removeDatum(-4.0) + assert(3 == runningAverage.getCount) + assert((2.0 - runningAverage.getAverage).abs < epsilon) + runningAverage.removeDatum(4.0) + assert(2 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + runningAverage.changeDatum(0.0) + assert(2 == runningAverage.getCount) + assert((1.0 - runningAverage.getAverage).abs < epsilon) + runningAverage.changeDatum(2.0) + assert(2 == runningAverage.getCount) + assert((2.0 - runningAverage.getAverage).abs < epsilon) + } + + + test("testFullRunningAveragCopyConstructor") { + val runningAverage: RunningAverage = new FullRunningAverage + runningAverage.addDatum(1.0) + runningAverage.addDatum(1.0) + assert(2 == runningAverage.getCount) + assert(1.0 - runningAverage.getAverage < epsilon) + val copy: RunningAverage = new FullRunningAverage(runningAverage.getCount, runningAverage.getAverage) + assert(2 == copy.getCount) + assert(1.0 - copy.getAverage < epsilon) + } + + + + // Inverted Running Average tests + test("testInvertedRunningAverage") { + val avg: RunningAverage = new FullRunningAverage + val inverted: RunningAverage = new InvertedRunningAverage(avg) + assert(0 == inverted.getCount) + avg.addDatum(1.0) + assert(1 == inverted.getCount) + assert((1.0 + inverted.getAverage).abs < epsilon) // inverted.getAverage == -1.0 + avg.addDatum(2.0) + assert(2 == inverted.getCount) + assert((1.5 + inverted.getAverage).abs < epsilon) // inverted.getAverage == -1.5 + } + + test ("testInvertedRunningAverageAndStdDev") { + val avg: RunningAverageAndStdDev = new FullRunningAverageAndStdDev + val inverted: RunningAverageAndStdDev = new InvertedRunningAverageAndStdDev(avg) + assert(0 == inverted.getCount) + avg.addDatum(1.0) + assert(1 == inverted.getCount) + assert(((1.0 + inverted.getAverage).abs < epsilon)) // inverted.getAverage == -1.0 + avg.addDatum(2.0) + assert(2 == inverted.getCount) + assert((1.5 + inverted.getAverage).abs < epsilon) // inverted.getAverage == -1.5 + assert(((Math.sqrt(2.0) / 2.0) - inverted.getStandardDeviation).abs < epsilon) + } + + + // confusion Matrix tests + val VALUES: Array[Array[Int]] = Array(Array(2, 3), Array(10, 20)) + val LABELS: Array[String] = Array("Label1", "Label2") + val OTHER: Array[Int] = Array(3, 6) + val DEFAULT_LABEL: String = "other" + + def fillConfusionMatrix(values: Array[Array[Int]], labels: Array[String], defaultLabel: String): ConfusionMatrix = { + val labelList = Arrays.asList(labels(0),labels(1)) + val confusionMatrix: ConfusionMatrix = new ConfusionMatrix(labelList, defaultLabel) + confusionMatrix.putCount("Label1", "Label1", values(0)(0)) + confusionMatrix.putCount("Label1", "Label2", values(0)(1)) + confusionMatrix.putCount("Label2", "Label1", values(1)(0)) + confusionMatrix.putCount("Label2", "Label2", values(1)(1)) + confusionMatrix.putCount("Label1", DEFAULT_LABEL, OTHER(0)) + confusionMatrix.putCount("Label2", DEFAULT_LABEL, OTHER(1)) + + confusionMatrix + } + + private def checkAccuracy(cm: ConfusionMatrix) { + val labelstrs = cm.getLabels + assert(3 == labelstrs.size) + assert((25.0 - cm.getAccuracy("Label1")).abs < epsilon) + assert((55.5555555 - cm.getAccuracy("Label2")).abs < epsilon) + assert(true == Double.isNaN(cm.getAccuracy("other"))) + } + + private def checkValues(cm: ConfusionMatrix) { + val counts: Array[Array[Int]] = cm.getConfusionMatrix + cm.toString + assert(counts.length == counts(0).length) + assert(3 == counts.length) + assert(VALUES(0)(0) == counts(0)(0)) + assert(VALUES(0)(1) == counts(0)(1)) + assert(VALUES(1)(0) == counts(1)(0)) + assert(VALUES(1)(1) == counts(1)(1)) + assert(true == Arrays.equals(new Array[Int](3), counts(2))) + assert(OTHER(0) == counts(0)(2)) + assert(OTHER(1) == counts(1)(2)) + assert(3 == cm.getLabels.size) + assert(true == cm.getLabels.contains(LABELS(0))) + assert(true == cm.getLabels.contains(LABELS(1))) + assert(true == cm.getLabels.contains(DEFAULT_LABEL)) + } + + test("testBuild"){ + val confusionMatrix: ConfusionMatrix = fillConfusionMatrix(VALUES, LABELS, DEFAULT_LABEL) + checkValues(confusionMatrix) + checkAccuracy(confusionMatrix) + } + + test("GetMatrix") { + val confusionMatrix: ConfusionMatrix = fillConfusionMatrix(VALUES, LABELS, DEFAULT_LABEL) + val m: Matrix = confusionMatrix.getMatrix + val rowLabels = m.getRowLabelBindings + assert(confusionMatrix.getLabels.size == m.numCols) + assert(true == rowLabels.keySet.contains(LABELS(0))) + assert(true == rowLabels.keySet.contains(LABELS(1))) + assert(true == rowLabels.keySet.contains(DEFAULT_LABEL)) + assert(2 == confusionMatrix.getCorrect(LABELS(0))) + assert(20 == confusionMatrix.getCorrect(LABELS(1))) + assert(0 == confusionMatrix.getCorrect(DEFAULT_LABEL)) + } + + /** + * Example taken from + * http://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_recall_fscore_support.html + */ + test("testPrecisionRecallAndF1ScoreAsScikitLearn") { + val labelList = Arrays.asList("0", "1", "2") + val confusionMatrix: ConfusionMatrix = new ConfusionMatrix(labelList, "DEFAULT") + confusionMatrix.putCount("0", "0", 2) + confusionMatrix.putCount("1", "0", 1) + confusionMatrix.putCount("1", "2", 1) + confusionMatrix.putCount("2", "1", 2) + val delta: Double = 0.001 + assert((0.222 - confusionMatrix.getWeightedPrecision).abs < delta) + assert((0.333 - confusionMatrix.getWeightedRecall).abs < delta) + assert((0.266 - confusionMatrix.getWeightedF1score).abs < delta) + } + + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala b/samsara/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala new file mode 100644 index 0000000..8f5ec99 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/decompositions/DecompositionsSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import org.scalatest.FunSuite +import org.apache.mahout.test.MahoutSuite +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ + +/** + * This suite tests only in-core decomposititions. + * <P> + * + * We moved distributed tests into mahout-spark module since they require a concrete distributed + * engine dependencies to run. + * <P> + */ +class DecompositionsSuite extends FunSuite with MahoutSuite { + + test("ssvd") { + + // Very naive, a full-rank only here. + val a = dense( + (1, 2, 3), + (3, 4, 5), + (-2, 6, 7), + (-3, 8, 9) + ) + + val rank = 2 + val (u, v, s) = ssvd(a, k = rank, q = 1) + + val (uControl, vControl, sControl) = svd(a) + + printf("U:\n%s\n", u) + printf("U-control:\n%s\n", uControl) + printf("V:\n%s\n", v) + printf("V-control:\n%s\n", vControl) + printf("Sigma:\n%s\n", s) + printf("Sigma-control:\n%s\n", sControl) + + (s - sControl(0 until rank)).norm(2) should be < 1E-7 + + // Singular vectors may be equivalent down to a sign only. + (u.norm - uControl(::, 0 until rank).norm).abs should be < 1E-7 + (v.norm - vControl(::, 0 until rank).norm).abs should be < 1E-7 + } + + test("spca") { + + import math._ + + val rnd = RandomUtils.getRandom + + // Number of points + val m = 500 + // Length of actual spectrum + val spectrumLen = 40 + + val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) + printf("spectrum:%s\n", spectrum) + + val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := + ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) + + // PCA Rotation matrix -- should also be orthonormal. + val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) + + val input = (u %*%: diagv(spectrum)) %*% tr.t + + // Calculate just first 10 principal factors and reduce dimensionality. + // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to + // ensure to zero stochastic error and assert only functional correctness of the method's pca- + // specific additions. + val k = 10 + var (pca, _, s) = spca(a = input, k = k, p = spectrumLen, q = 1) + printf("Svs:%s\n", s) + // Un-normalized pca data: + pca = pca %*%: diagv(s) + + // Of course, once we calculated the pca, the spectrum is going to be different since our originally + // generated input was not centered. So here, we'd just brute-solve pca to verify + val xi = input.colMeans() + for (r <- 0 until input.nrow) input(r, ::) -= xi + var (pcaControl, _, sControl) = svd(m = input) + + printf("Svs-control:%s\n", sControl) + pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) + + printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) + printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) + + (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/samsara/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala new file mode 100644 index 0000000..b288c62 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import org.apache.mahout.test.DistributedMahoutSuite +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.scalatest.{FunSuite, Matchers} +import org.apache.mahout.common.RandomUtils +import math._ + +/** + * ==Common distributed code to run against each distributed engine support.== + * + * Each distributed engine's decompositions package should have a suite that includes this feature + * as part of its distributed test suite. + * + */ +trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Matchers { this:FunSuite => + + + test("thin distributed qr") { + + val inCoreA = dense( + (1, 2, 3, 4), + (2, 3, 4, 5), + (3, -4, 5, 6), + (4, 5, 6, 7), + (8, 6, 7, 8) + ) + + val drmA = drmParallelize(inCoreA, numPartitions = 2) + val (drmQ, inCoreR) = dqrThin(drmA, checkRankDeficiency = false) + + // Assert optimizer still knows Q and A are identically partitioned + drmQ.partitioningTag should equal(drmA.partitioningTag) + +// drmQ.rdd.partitions.size should be(A.rdd.partitions.size) +// +// // Should also be zippable +// drmQ.rdd.zip(other = A.rdd) + + val inCoreQ = drmQ.collect + + printf("A=\n%s\n", inCoreA) + printf("Q=\n%s\n", inCoreQ) + printf("R=\n%s\n", inCoreR) + + val (qControl, rControl) = qr(inCoreA) + printf("qControl=\n%s\n", qControl) + printf("rControl=\n%s\n", rControl) + + // Validate with Cholesky + val ch = chol(inCoreA.t %*% inCoreA) + printf("A'A=\n%s\n", inCoreA.t %*% inCoreA) + printf("L:\n%s\n", ch.getL) + + val rControl2 = (ch.getL cloned).t + val qControl2 = ch.solveRight(inCoreA) + printf("qControl2=\n%s\n", qControl2) + printf("rControl2=\n%s\n", rControl2) + + // Housholder approach seems to be a little bit more stable + (rControl - inCoreR).norm should be < 1E-5 + (qControl - inCoreQ).norm should be < 1E-5 + + // Assert identicity with in-core Cholesky-based -- this should be tighter. + (rControl2 - inCoreR).norm should be < 1E-10 + (qControl2 - inCoreQ).norm should be < 1E-10 + + // Assert orhtogonality: + // (a) Q[,j] dot Q[,j] == 1.0 for all j + // (b) Q[,i] dot Q[,j] == 0.0 for all i != j + for (col <- 0 until inCoreQ.ncol) + ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10 + for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol) + (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10 + + + } + + test("dssvd - the naive-est - q=0") { + dssvdNaive(q = 0) + } + + test("ddsvd - naive - q=1") { + dssvdNaive(q = 1) + } + + test("ddsvd - naive - q=2") { + dssvdNaive(q = 2) + } + + + def dssvdNaive(q: Int) { + val inCoreA = dense( + (1, 2, 3, 4), + (2, 3, 4, 5), + (3, -4, 5, 6), + (4, 5, 6, 7), + (8, 6, 7, 8) + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q) + val (inCoreU, inCoreV) = (drmU.collect, drmV.collect) + + printf("U:\n%s\n", inCoreU) + printf("V:\n%s\n", inCoreV) + printf("Sigma:\n%s\n", s) + + (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5 + } + + test("dspca") { + + val rnd = RandomUtils.getRandom + + // Number of points + val m = 500 + // Length of actual spectrum + val spectrumLen = 40 + + val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) + printf("spectrum:%s\n", spectrum) + + val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := + ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) + + // PCA Rotation matrix -- should also be orthonormal. + val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) + + val input = (u %*%: diagv(spectrum)) %*% tr.t + val drmInput = drmParallelize(m = input, numPartitions = 2) + + // Calculate just first 10 principal factors and reduce dimensionality. + // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to + // ensure to zero stochastic error and assert only functional correctness of the method's pca- + // specific additions. + val k = 10 + + // Calculate just first 10 principal factors and reduce dimensionality. + var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) + // Un-normalized pca data: + drmPCA = drmPCA %*% diagv(s) + + val pca = drmPCA.checkpoint(CacheHint.NONE).collect + + // Of course, once we calculated the pca, the spectrum is going to be different since our originally + // generated input was not centered. So here, we'd just brute-solve pca to verify + val xi = input.colMeans() + for (r <- 0 until input.nrow) input(r, ::) -= xi + var (pcaControl, _, sControl) = svd(m = input) + pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) + + printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) + printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) + + (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 + + } + + test("dals") { + + val rnd = RandomUtils.getRandom + + // Number of points + val m = 500 + val n = 500 + + // Length of actual spectrum + val spectrumLen = 40 + + // Create singluar values with decay + val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) + printf("spectrum:%s\n", spectrum) + + // Create A as an ideal input + val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*% + qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + // Decompose using ALS + val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple + val inCoreU = drmU.collect + val inCoreV = drmV.collect + + val predict = inCoreU %*% inCoreV.t + + printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3)) + printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3)) + + val err = (inCoreA - predict).norm + printf("norm of residuals %f\n", err) + printf("train iteration rmses: %s\n", rmse) + + err should be < 15e-2 + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala b/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala new file mode 100644 index 0000000..849db68 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import org.apache.mahout.test.DistributedMahoutSuite +import org.scalatest.{FunSuite, Matchers} +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import RLikeDrmOps._ + +/** Common tests for DrmLike operators to be executed by all distributed engines. */ +trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers { + this: FunSuite => + + test("mapBlock") { + + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + val B = A.mapBlock(/* Inherit width */) { + case (keys, block) => keys -> (block += 1.0) + } + + val inCoreB = B.collect + val inCoreBControl = inCoreA + 1.0 + + println(inCoreB) + + // Assert they are the same + (inCoreB - inCoreBControl).norm should be < 1E-10 + + } + + test("col range") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + val B = A(::, 1 to 2) + val inCoreB = B.collect + val inCoreBControl = inCoreA(::, 1 to 2) + + println(inCoreB) + + // Assert they are the same + (inCoreB - inCoreBControl).norm should be < 1E-10 + + } + + test("row range") { + + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + val B = A(1 to 2, ::) + val inCoreB = B.collect + val inCoreBControl = inCoreA(1 to 2, ::) + + println(inCoreB) + + // Assert they are the same + (inCoreB - inCoreBControl).norm should be < 1E-10 + + } + + test("col, row range") { + + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + val B = A(1 to 2, 1 to 2) + val inCoreB = B.collect + val inCoreBControl = inCoreA(1 to 2, 1 to 2) + + println(inCoreB) + + // Assert they are the same + (inCoreB - inCoreBControl).norm should be < 1E-10 + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala new file mode 100644 index 0000000..6c9313c --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import org.apache.mahout.test.DistributedMahoutSuite +import org.scalatest.{FunSuite, Matchers} +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import RLikeDrmOps._ +import scala.reflect.ClassTag + +/** Common DRM tests to be run by all distributed engines. */ +trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers { + this: FunSuite => + + test("DRM DFS i/o (local)") { + + val uploadPath = TmpDir + "UploadedDRM" + + val inCoreA = dense((1, 2, 3), (3, 4, 5)) + val drmA = drmParallelize(inCoreA) + + drmA.dfsWrite(path = uploadPath) + + println(inCoreA) + + // Load back from hdfs + val drmB = drmDfsRead(path = uploadPath) + + // Make sure keys are correctly identified as ints + drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int + + // Collect back into in-core + val inCoreB = drmB.collect + + // Print out to see what it is we collected: + println(inCoreB) + + (inCoreA - inCoreB).norm should be < 1e-7 + } + + test("DRM parallelizeEmpty") { + + val drmEmpty = drmParallelizeEmpty(100, 50) + + // collect back into in-core + val inCoreEmpty = drmEmpty.collect + + inCoreEmpty.sum.abs should be < 1e-7 + drmEmpty.nrow shouldBe 100 + drmEmpty.ncol shouldBe 50 + inCoreEmpty.nrow shouldBe 100 + inCoreEmpty.ncol shouldBe 50 + + + + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/samsara/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala new file mode 100644 index 0000000..2e6204d --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import org.apache.mahout.test.DistributedMahoutSuite +import org.scalatest.{FunSuite, Matchers} +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import RLikeDrmOps._ +import decompositions._ +import org.apache.mahout.math.drm.logical.{OpAtB, OpAtA, OpAtx} + +/** Common engine tests for distributed R-like DRM operations */ +trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers { + this: FunSuite => + + val epsilon = 1E-5 + + test("A.t") { + + val inCoreA = dense((1, 2, 3), (3, 4, 5)) + + val A = drmParallelize(inCoreA) + + val inCoreAt = A.t.collect + + // Assert first norm of difference is less than error margin. + (inCoreAt - inCoreA.t).norm should be < epsilon + + } + + test("C = A %*% B") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val B = drmParallelize(inCoreB, numPartitions = 2) + + // Actual + val inCoreCControl = inCoreA %*% inCoreB + + // Distributed operation + val C = A %*% B + val inCoreC = C.collect + println(inCoreC) + + (inCoreC - inCoreCControl).norm should be < 1E-10 + + // We also should be able to collect via implicit checkpoint + val inCoreC2 = C.collect + println(inCoreC2) + + (inCoreC2 - inCoreCControl).norm should be < 1E-10 + + } + + test("C = A %*% B mapBlock {}") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() + + // Actual + val inCoreCControl = inCoreA %*% inCoreB + + A.colSums() + B.colSums() + + + val x = drmBroadcast(dvec(0, 0)) + val x2 = drmBroadcast(dvec(0, 0)) + // Distributed operation + val C = (B.t %*% A.t).t.mapBlock() { + case (keys, block) => + for (row <- 0 until block.nrow) block(row, ::) += x.value + x2 + keys -> block + } + + val inCoreC = C checkpoint CacheHint.NONE collect; + println(inCoreC) + + (inCoreC - inCoreCControl).norm should be < 1E-10 + + // We also should be able to collect via implicit checkpoint + val inCoreC2 = C.collect + println(inCoreC2) + + (inCoreC2 - inCoreCControl).norm should be < 1E-10 + + val inCoreQ = dqrThin(C)._1.collect + + printf("Q=\n%s\n", inCoreQ) + + // Assert unit-orthogonality + ((inCoreQ(::, 0) dot inCoreQ(::, 0)) - 1.0).abs should be < 1e-10 + (inCoreQ(::, 0) dot inCoreQ(::, 1)).abs should be < 1e-10 + + } + + test("C = A %*% B incompatible B keys") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val B = drmParallelize(inCoreB, numPartitions = 2) + // Re-key B into DrmLike[String] instead of [Int] + .mapBlock()({ + case (keys, block) => keys.map(_.toString) -> block + }) + + val C = A %*% B + + intercept[IllegalArgumentException] { + // This plan must not compile + C.checkpoint() + } + } + + test("Spark-specific C = At %*% B , join") { + + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) + val inCoreB = dense((3, 5), (4, 6), (0, 1)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val B = drmParallelize(inCoreB, numPartitions = 2) + + val C = A.t %*% B + + mahoutCtx.optimizerRewrite(C) should equal(OpAtB[Int](A, B)) + + val inCoreC = C.collect + val inCoreControlC = inCoreA.t %*% inCoreB + + (inCoreC - inCoreControlC).norm should be < 1E-10 + + } + + + test("C = At %*% B , join, String-keyed") { + + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) + val inCoreB = dense((3, 5), (4, 6), (0, 1)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + .mapBlock()({ + case (keys, block) => keys.map(_.toString) -> block + }) + + val B = drmParallelize(inCoreB, numPartitions = 2) + .mapBlock()({ + case (keys, block) => keys.map(_.toString) -> block + }) + + val C = A.t %*% B + + mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B)) + + val inCoreC = C.collect + val inCoreControlC = inCoreA.t %*% inCoreB + + (inCoreC - inCoreControlC).norm should be < 1E-10 + + } + + test("C = At %*% B , zippable, String-keyed") { + + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + .mapBlock()({ + case (keys, block) => keys.map(_.toString) -> block + }) + + val B = A + 1.0 + + val C = A.t %*% B + + mahoutCtx.optimizerRewrite(C) should equal(OpAtB[String](A, B)) + + val inCoreC = C.collect + val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0) + + (inCoreC - inCoreControlC).norm should be < 1E-10 + + } + + test("C = A %*% inCoreB") { + + val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) + val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val C = A %*% inCoreB + + val inCoreC = C.collect + val inCoreCControl = inCoreA %*% inCoreB + + println(inCoreC) + (inCoreC - inCoreCControl).norm should be < 1E-10 + + } + + test("C = inCoreA %*%: B") { + + val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) + val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7)) + + val B = drmParallelize(inCoreB, numPartitions = 2) + val C = inCoreA %*%: B + + val inCoreC = C.collect + val inCoreCControl = inCoreA %*% inCoreB + + println(inCoreC) + (inCoreC - inCoreCControl).norm should be < 1E-10 + + } + + test("C = A.t %*% A") { + val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + val AtA = A.t %*% A + + // Assert optimizer detects square + mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A)) + + val inCoreAtA = AtA.collect + val inCoreAtAControl = inCoreA.t %*% inCoreA + + (inCoreAtA - inCoreAtAControl).norm should be < 1E-10 + } + + test("C = A.t %*% A fat non-graph") { + // Hack the max in-mem size for this test + System.setProperty("mahout.math.AtA.maxInMemNCol", "540") + + val inCoreA = Matrices.uniformView(400, 550, 1234) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + val AtA = A.t %*% A + + // Assert optimizer detects square + mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A)) + + val inCoreAtA = AtA.collect + val inCoreAtAControl = inCoreA.t %*% inCoreA + + (inCoreAtA - inCoreAtAControl).norm should be < 1E-10 + } + + test("C = A.t %*% A non-int key") { + val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) + val AintKeyd = drmParallelize(m = inCoreA, numPartitions = 2) + val A = AintKeyd.mapBlock() { + case (keys, block) => keys.map(_.toString) -> block + } + + val AtA = A.t %*% A + + // Assert optimizer detects square + mahoutCtx.optimizerRewrite(action = AtA) should equal(OpAtA(A)) + + val inCoreAtA = AtA.collect + val inCoreAtAControl = inCoreA.t %*% inCoreA + + (inCoreAtA - inCoreAtAControl).norm should be < 1E-10 + } + + test("C = A + B") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val B = drmParallelize(inCoreB, numPartitions = 2) + + val C = A + B + val inCoreC = C.collect + + // Actual + val inCoreCControl = inCoreA + inCoreB + + (inCoreC - inCoreCControl).norm should be < 1E-10 + } + + test("C = A + B, identically partitioned") { + + val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + +// printf("A.nrow=%d.\n", A.rdd.count()) + + // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. + val B = A.mapBlock() { + case (keys, block) => + val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} + keys -> bBlock + } + // Prevent repeated computation non-determinism + .checkpoint() + + val inCoreB = B.collect + + printf("A=\n%s\n", inCoreA) + printf("B=\n%s\n", inCoreB) + + val C = A + B + + val inCoreC = C.collect + + printf("C=\n%s\n", inCoreC) + + // Actual + val inCoreCControl = inCoreA + inCoreB + + (inCoreC - inCoreCControl).norm should be < 1E-10 + } + + + test("C = A + B side test 1") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + val B = drmParallelize(inCoreB, numPartitions = 2) + + val C = A + B + val inCoreC = C.collect + + val inCoreD = (A + B).collect + + // Actual + val inCoreCControl = inCoreA + inCoreB + + (inCoreC - inCoreCControl).norm should be < 1E-10 + (inCoreD - inCoreCControl).norm should be < 1E-10 + } + + test("C = A + B side test 2") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + val B = drmParallelize(inCoreB, numPartitions = 2) + + val C = A + B + val inCoreC = C.collect + + val inCoreD = (A + B).collect + + // Actual + val inCoreCControl = inCoreA + inCoreB + + (inCoreC - inCoreCControl).norm should be < 1E-10 + (inCoreD - inCoreCControl).norm should be < 1E-10 + } + + test("C = A + B side test 3") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + + val B = drmParallelize(inCoreB, numPartitions = 2) + // val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER) + val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY) + + val C = A + B + val inCoreC = C.collect + + val inCoreD = (A + B).collect + + // Actual + val inCoreCControl = inCoreA + inCoreB * 2.0 + + (inCoreC - inCoreCControl).norm should be < 1E-10 + (inCoreD - inCoreCControl).norm should be < 1E-10 + } + + test("Ax") { + val inCoreA = dense( + (1, 2), + (3, 4), + (20, 30) + ) + val x = dvec(10, 3) + + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + val ax = (drmA %*% x).collect(::, 0) + + ax should equal(inCoreA %*% x) + } + + test("A'x") { + val inCoreA = dense( + (1, 2), + (3, 4), + (20, 30) + ) + val x = dvec(10, 3, 4) + + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + mahoutCtx.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x)) + + val atx = (drmA.t %*% x).collect(::, 0) + + atx should equal(inCoreA.t %*% x) + } + + test("colSums, colMeans") { + val inCoreA = dense( + (1, 2), + (3, 4), + (20, 30) + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.colSums() should equal(inCoreA.colSums()) + drmA.colMeans() should equal(inCoreA.colMeans()) + } + + test("rowSums, rowMeans") { + val inCoreA = dense( + (1, 2), + (3, 4), + (20, 30) + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.rowSums() should equal(inCoreA.rowSums()) + drmA.rowMeans() should equal(inCoreA.rowMeans()) + } + + test("A.diagv") { + val inCoreA = dense( + (1, 2, 3), + (3, 4, 5), + (20, 30, 7) + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.diagv should equal(inCoreA.diagv) + } + + test("numNonZeroElementsPerColumn") { + val inCoreA = dense( + (0, 2), + (3, 0), + (0, -30) + + ) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn()) + } + + test("C = A cbind B, cogroup") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() + + (A.cbind(B) -: controlC).norm should be < 1e-10 + + } + + test("C = A cbind B, zip") { + + val inCoreA = dense((1, 2), (3, 4)) + val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + + (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10 + + } + + test("B = A + 1.0") { + val inCoreA = dense((1, 2), (2, 3), (3, 4)) + val controlB = inCoreA + 1.0 + + val drmB = drmParallelize(m = inCoreA, numPartitions = 2) + 1.0 + + (drmB -: controlB).norm should be < 1e-10 + } + + test("C = A rbind B") { + + val inCoreA = dense((1, 2), (3, 5)) + val inCoreB = dense((7, 11), (13, 17)) + val controlC = dense((1, 2), (3, 5), (7, 11), (13, 17)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() + + (A.rbind(B) -: controlC).norm should be < 1e-10 + } + + test("C = A rbind B, with empty") { + + val inCoreA = dense((1, 2), (3, 5)) + val emptyB = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2) + val controlC = dense((1, 2), (3, 5), (0, 0), (0, 0)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + + (A.rbind(emptyB) -: controlC).norm should be < 1e-10 + } + + /** Test dsl overloads over scala operations over matrices */ + test("scalarOps") { + val drmA = drmParallelize(m = dense( + (1, 2, 3), + (3, 4, 5), + (7, 8, 9) + ), + numPartitions = 2) + + (10 * drmA - (10 *: drmA)).norm shouldBe 0 + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala new file mode 100644 index 0000000..b10cde3 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.scalatest.{Matchers, FunSuite} +import org.apache.mahout.math._ +import scala.math._ +import RLikeOps._ +import scala._ +import scala.util.Random +import org.apache.mahout.test.MahoutSuite +import org.apache.mahout.common.RandomUtils + +class MathSuite extends FunSuite with MahoutSuite { + + test("chol") { + + // try to solve Ax=b with cholesky: + // this requires + // (LL')x = B + // L'x= (L^-1)B + // x=(L'^-1)(L^-1)B + + val a = dense((1, 2, 3), (2, 3, 4), (3, 4, 5.5)) + + // make sure it is symmetric for a valid solution + a := a.t %*% a + + printf("A= \n%s\n", a) + + val b = dense((9, 8, 7)).t + + printf("b = \n%s\n", b) + + // fails if chol(a,true) + val ch = chol(a) + + printf("L = \n%s\n", ch.getL) + + printf("(L^-1)b =\n%s\n", ch.solveLeft(b)) + + val x = ch.solveRight(eye(3)) %*% ch.solveLeft(b) + + printf("x = \n%s\n", x.toString) + + val axmb = (a %*% x) - b + + printf("AX - B = \n%s\n", axmb.toString) + + axmb.norm should be < 1e-10 + + } + + test("chol2") { + + val vtv = new DenseSymmetricMatrix( + Array( + 0.0021401286568947376, 0.001309251254596442, 0.0016003218703045058, + 0.001545407014131058, 0.0012772546647977234, + 0.001747768702674435 + ), true) + + printf("V'V=\n%s\n", vtv cloned) + + val vblock = dense( + (0.0012356809018514347, 0.006141139195280868, 8.037742467936037E-4), + (0.007910767859830255, 0.007989899899005457, 0.006877961936587515), + (0.007011211118759952, 0.007458865101641882, 0.0048344749320346795), + (0.006578789899685284, 0.0010812485516549452, 0.0062146270886981655) + ) + + val d = diag(15.0, 4) + + + val b = dense( + (0.36378319648203084), + (0.3627384439613304), + (0.2996934112658234)) + + printf("B=\n%s\n", b) + + + val cholArg = vtv + (vblock.t %*% d %*% vblock) + diag(4e-6, 3) + + printf("cholArg=\n%s\n", cholArg) + + printf("V'DV=\n%s\n", (vblock.t %*% d %*% vblock)) + + printf("V'V+V'DV=\n%s\n", vtv + (vblock.t %*% d %*% vblock)) + + val ch = chol(cholArg) + + printf("L=\n%s\n", ch.getL) + + val x = ch.solveRight(eye(cholArg.nrow)) %*% ch.solveLeft(b) + + printf("X=\n%s\n", x) + + assert((cholArg %*% x - b).norm < 1e-10) + + } + + test("qr") { + val a = dense((1, 2, 3), (2, 3, 6), (3, 4, 5), (4, 7, 8)) + val (q, r) = qr(a) + + printf("Q=\n%s\n", q) + printf("R=\n%s\n", r) + + for (i <- 0 until q.ncol; j <- i + 1 until q.ncol) + assert(abs(q(::, i) dot q(::, j)) < 1e-10) + } + + test("solve matrix-vector") { + val a = dense((1, 3), (4, 2)) + val b = dvec(11, 14) + val x = solve(a, b) + + val control = dvec(2, 3) + + (control - x).norm(2) should be < 1e-10 + } + + test("solve matrix-matrix") { + val a = dense((1, 3), (4, 2)) + val b = dense((11), (14)) + val x = solve(a, b) + + val control = dense((2), (3)) + + (control - x).norm should be < 1e-10 + } + + test("solve to obtain inverse") { + val a = dense((1, 3), (4, 2)) + val x = solve(a) + + val identity = a %*% x + + val control = eye(identity.ncol) + + (control - identity).norm should be < 1e-10 + } + + test("solve rejects non-square matrix") { + intercept[IllegalArgumentException] { + val a = dense((1, 2, 3), (4, 5, 6)) + val b = dvec(1, 2) + solve(a, b) + } + } + + test("solve rejects singular matrix") { + intercept[IllegalArgumentException] { + val a = dense((1, 2), (2 , 4)) + val b = dvec(1, 2) + solve(a, b) + } + } + + test("svd") { + + val a = dense((1, 2, 3), (3, 4, 5)) + + val (u, v, s) = svd(a) + + printf("U:\n%s\n", u.toString) + printf("V:\n%s\n", v.toString) + printf("Sigma:\n%s\n", s.toString) + + val aBar = u %*% diagv(s) %*% v.t + + val amab = a - aBar + + printf("A-USV'=\n%s\n", amab.toString) + + assert(amab.norm < 1e-10) + + } + + test("random uniform") { + val omega1 = Matrices.symmetricUniformView(2, 3, 1234) + val omega2 = Matrices.symmetricUniformView(2, 3, 1234) + + val a = sparse( + 0 -> 1 :: 1 -> 2 :: Nil, + 0 -> 3 :: 1 -> 4 :: Nil, + 0 -> 2 :: 1 -> 0.0 :: Nil + ) + + val block = a(0 to 0, ::).cloned + val block2 = a(1 to 1, ::).cloned + + (block %*% omega1 - (a %*% omega2)(0 to 0, ::)).norm should be < 1e-7 + (block2 %*% omega1 - (a %*% omega2)(1 to 1, ::)).norm should be < 1e-7 + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatlabLikeMatrixOpsSuite.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatlabLikeMatrixOpsSuite.scala b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatlabLikeMatrixOpsSuite.scala new file mode 100644 index 0000000..547f710 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatlabLikeMatrixOpsSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.scalatest.FunSuite +import MatlabLikeOps._ +import scala.Predef._ +import org.apache.mahout.test.MahoutSuite + +class MatlabLikeMatrixOpsSuite extends FunSuite with MahoutSuite { + + test("multiplication") { + + val a = dense((1, 2, 3), (3, 4, 5)) + val b = dense(1, 4, 5) + val m = a * b + + assert(m(0, 0) == 24) + assert(m(1, 0) == 44) + println(m.toString) + } + + test("Hadamard") { + val a = dense( + (1, 2, 3), + (3, 4, 5) + ) + val b = dense( + (1, 1, 2), + (2, 1, 1) + ) + + val c = a *@ b + + printf("C=\n%s\n", c) + + assert(c(0, 0) == 1) + assert(c(1, 2) == 5) + println(c.toString) + + val d = a *@ 5.0 + assert(d(0, 0) == 5) + assert(d(1, 1) == 20) + + a *@= b + assert(a(0, 0) == 1) + assert(a(1, 2) == 5) + println(a.toString) + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala ---------------------------------------------------------------------- diff --git a/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala new file mode 100644 index 0000000..d7b22d9 --- /dev/null +++ b/samsara/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.scalatest.{Matchers, FunSuite} +import RLikeOps._ +import scala._ +import org.apache.mahout.test.MahoutSuite +import org.apache.mahout.math.{RandomAccessSparseVector, SequentialAccessSparseVector, Matrices} +import org.apache.mahout.common.RandomUtils + + +class MatrixOpsSuite extends FunSuite with MahoutSuite { + + test("equivalence") { + val a = dense((1, 2, 3), (3, 4, 5)) + val b = dense((1, 2, 3), (3, 4, 5)) + val c = dense((1, 4, 3), (3, 4, 5)) + assert(a === b) + assert(a !== c) + } + + test("elementwise plus, minus") { + val a = dense((1, 2, 3), (3, 4, 5)) + val b = dense((1, 1, 2), (2, 1, 1)) + + val c = a + b + assert(c(0, 0) == 2) + assert(c(1, 2) == 6) + println(c.toString) + } + + test("matrix, vector slicing") { + + val a = dense((1, 2, 3), (3, 4, 5)) + + assert(a(::, 0).sum == 4) + assert(a(1, ::).sum == 12) + + assert(a(0 to 1, 1 to 2).sum == 14) + + // assign to slice-vector + a(0, 0 to 1) :=(3, 5) + // or + a(0, 0 to 1) = (3, 5) + + assert(a(0, ::).sum == 11) + + println(a.toString) + + // assign to a slice-matrix + a(0 to 1, 0 to 1) := dense((1, 1), (2, 2.5)) + + // or + a(0 to 1, 0 to 1) = dense((1, 1), (2, 2.5)) + + println(a) + println(a.sum) + + val b = dense((1, 2, 3), (3, 4, 5)) + b(0, ::) -= dvec(1, 2, 3) + println(b) + b(0, ::) should equal(dvec(0, 0, 0)) + + } + + test("assignments") { + + val a = dense((1, 2, 3), (3, 4, 5)) + + val b = a cloned + + b(0, 0) = 2.0 + + printf("B=\n%s\n", b) + + assert((b - a).norm - 1 < 1e-10) + + val e = eye(5) + + printf("I(5)=\n%s\n", e) + + a(0 to 1, 1 to 2) = dense((3, 2), (2, 3)) + a(0 to 1, 1 to 2) := dense((3, 2), (2, 3)) + + + } + + test("sparse") { + + val a = sparse((1, 3) :: Nil, + (0, 2) ::(1, 2.5) :: Nil + ) + println(a.toString) + } + + test("colSums, rowSums, colMeans, rowMeans, numNonZeroElementsPerColumn") { + val a = dense( + (2, 3, 4), + (3, 4, 5) + ) + + a.colSums() should equal(dvec(5, 7, 9)) + a.rowSums() should equal(dvec(9, 12)) + a.colMeans() should equal(dvec(2.5, 3.5, 4.5)) + a.rowMeans() should equal(dvec(3, 4)) + a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2)) + a.numNonZeroElementsPerRow() should equal(dvec(3,3)) + + } + + test("numNonZeroElementsPerColumn and Row") { + val a = dense( + (2, 3, 4), + (3, 4, 5), + (-5, 0, -1), + (0, 0, 1) + ) + + a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4)) + a.numNonZeroElementsPerRow() should equal(dvec(3,3,2,1)) + } + + test("Vector Assignment performance") { + + val n = 1000 + val k = (n * 0.1).toInt + val nIters = 10000 + + val rnd = RandomUtils.getRandom + + val src = new SequentialAccessSparseVector(n) + for (i <- 0 until k) src(rnd.nextInt(n)) = rnd.nextDouble() + + val times = (0 until 50).map { i => + val ms = System.currentTimeMillis() + var j = 0 + while (j < nIters) { + new SequentialAccessSparseVector(n) := src + j += 1 + } + System.currentTimeMillis() - ms + } + + .tail + + val avgTime = times.sum.toDouble / times.size + + printf("Average assignment seqSparse2seqSparse time: %.3f ms\n", avgTime) + + val times2 = (0 until 50).map { i => + val ms = System.currentTimeMillis() + var j = 0 + while (j < nIters) { + new SequentialAccessSparseVector(n) := (new RandomAccessSparseVector(n) := src) + j += 1 + } + System.currentTimeMillis() - ms + } + + .tail + + val avgTime2 = times2.sum.toDouble / times2.size + + printf("Average assignment seqSparse2seqSparse via Random Access Sparse time: %.3f ms\n", avgTime2) + + } + + +} \ No newline at end of file
