http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala deleted file mode 100644 index 3b5affd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.drivers - -import scopt.OptionParser - -import scala.collection.immutable - -/** - * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to - * keep both standarized. - * @param programName Name displayed in help message, the name by which the driver is invoked. - * @note options are engine neutral by convention. See the engine specific extending class for - * to add Spark or other engine options. - */ -class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { - - // build options from some stardard CLI param groups - // Note: always put the driver specific options at the last so they can override any previous options! - var opts = Map.empty[String, Any] - - override def showUsageOnError = true - - def parseIOOptions(numInputs: Int = 1) = { - opts = opts ++ MahoutOptionParser.FileIOOptions - note("Input, output options") - opt[String]('i', "input") required() action { (x, options) => - options + ("input" -> x) - } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" + - " (required)") - - if (numInputs == 2) { - opt[String]("input2") abbr ("i2") action { (x, options) => - options + ("input2" -> x) - } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " + - "(optional). Default: empty.") - } - - opt[String]('o', "output") required() action { (x, options) => - if (x.endsWith("/")) { - options + ("output" -> x) - } else { - options + ("output" -> (x + "/")) - } - } text ("Path for output directory, any HDFS supported URI (required)") - - } - - def parseGenericOptions() = { - opts = opts ++ MahoutOptionParser.GenericOptions - opt[Int]("randomSeed") abbr ("rs") action { (x, options) => - options + ("randomSeed" -> x) - } validate { x => - if (x > 0) success else failure("Option --randomSeed must be > 0") - } - - //output both input IndexedDatasets - opt[Unit]("writeAllDatasets") hidden() action { (_, options) => - options + ("writeAllDatasets" -> true) - }//Hidden option, though a user might want this. - } - - def parseElementInputSchemaOptions() = { - //Input text file schema--not driver specific but input data specific, elements input, - // not rows of IndexedDatasets - opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions - note("\nInput text file schema options:") - opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { - (x, options) => - options + ("inDelim" -> x) - } - - opt[String]("filter1") abbr ("f1") action { (x, options) => - options + ("filter1" -> x) - } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " + - "Default: no filter, all data is used") - - opt[String]("filter2") abbr ("f2") action { (x, options) => - options + ("filter2" -> x) - } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " + - "If not present no secondary dataset is collected") - - opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) => - options + ("rowIDColumn" -> x) - } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { - x => - if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") - } - - opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) => - options + ("itemIDColumn" -> x) - } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { - x => - if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") - } - - opt[Int]("filterColumn") abbr ("fc") action { (x, options) => - options + ("filterColumn" -> x) - } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " + - "filter") validate { x => - if (x >= -1) success else failure("Option --filterColNum must be >= -1") - } - - note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" + - " \"userID<tab>itemID<tab>any-text...\" and all rows will be used") - - //check for column consistency - checkConfig { options: Map[String, Any] => - if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int] - || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int] - || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]) - failure("The row, item, and filter positions must be unique.") else success - } - - //check for filter consistency - checkConfig { options: Map[String, Any] => - if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String] - && options("filter2").asInstanceOf[String] != null.asInstanceOf[String] - && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String]) - failure ("If using filters they must be unique.") else success - } - - } - - def parseFileDiscoveryOptions() = { - //File finding strategy--not driver specific - opts = opts ++ MahoutOptionParser.FileDiscoveryOptions - note("\nFile discovery options:") - opt[Unit]('r', "recursive") action { (_, options) => - options + ("recursive" -> true) - } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") - - opt[String]("filenamePattern") abbr ("fp") action { (x, options) => - options + ("filenamePattern" -> x) - } text ("Regex to match in determining input files (optional). Default: filename in the --input option " + - "or \"^part-.*\" if --input is a directory") - - } - - def parseIndexedDatasetFormatOptions() = { - opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions - note("\nOutput text file schema options:") - opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => - options + ("rowKeyDelim" -> x) - } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") - - opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => - options + ("columnIdStrengthDelim" -> x) - } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") - - opt[String]("elementDelim") abbr ("td") action { (x, options) => - options + ("elementDelim" -> x) - } text ("Separates vector element values in the values list (optional). Default: \" \"") - - opt[Unit]("omitStrength") abbr ("os") action { (_, options) => - options + ("omitStrength" -> true) - } text ("Do not write the strength to the output files (optional), Default: false.") - note("This option is used to output indexable data for creating a search engine recommender.") - - note("\nDefault delimiters will produce output of the form: " + - "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") - } - -} - -/** - * Companion object defines default option groups for reference in any driver that needs them. - * @note not all options are platform neutral so other platforms can add default options here if desired - */ -object MahoutOptionParser { - - // set up the various default option groups - final val GenericOptions = immutable.HashMap[String, Any]( - "randomSeed" -> System.currentTimeMillis().toInt, - "writeAllDatasets" -> false) - - final val SparkOptions = immutable.HashMap[String, Any]( - "master" -> "local", - "sparkExecutorMem" -> "", - "appName" -> "Generic Spark App, Change this.") - - final val FileIOOptions = immutable.HashMap[String, Any]( - "input" -> null.asInstanceOf[String], - "input2" -> null.asInstanceOf[String], - "output" -> null.asInstanceOf[String]) - - final val FileDiscoveryOptions = immutable.HashMap[String, Any]( - "recursive" -> false, - "filenamePattern" -> "^part-.*") - - final val TextDelimitedElementsOptions = immutable.HashMap[String, Any]( - "rowIDColumn" -> 0, - "itemIDColumn" -> 1, - "filterColumn" -> -1, - "filter1" -> null.asInstanceOf[String], - "filter2" -> null.asInstanceOf[String], - "inDelim" -> "[,\t ]") - - final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any]( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ", - "omitStrength" -> false) -} - -
http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala deleted file mode 100644 index 6557ab0..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.cf - -import org.apache.mahout.math._ -import org.apache.mahout.math.indexeddataset.IndexedDataset -import scalabindings._ -import RLikeOps._ -import drm._ -import RLikeDrmOps._ -import scala.collection.JavaConversions._ -import org.apache.mahout.math.stats.LogLikelihood -import collection._ -import org.apache.mahout.math.function.{VectorFunction, Functions} - -import scala.util.Random - - -/** - * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", - * available at http://www.mapr.com/practical-machine-learning - * - * see also "Sebastian Schelter, Christoph Boden, Volker Markl: - * Scalable Similarity-Based Neighborhood Methods with MapReduce - * ACM Conference on Recommender Systems 2012" - */ -object SimilarityAnalysis extends Serializable { - - /** Compares (Int,Double) pairs by the second value */ - private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - - /** - * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... - * and returns a list of similarity and cross-similarity matrices - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and - * cross-cooccurrence - */ - def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { - - implicit val distributedContext = drmARaw.context - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num users, which equals the maximum number of interactions per item - val numUsers = drmA.nrow.toInt - - // Compute & broadcast the number of interactions per thing in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - - // Compute co-occurrence matrix A'A - val drmAtA = drmA.t %*% drmA - - // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) - - var similarityMatrices = List(drmSimilarityAtA) - - // Now look at cross-co-occurrences - for (drmBRaw <- drmBs) { - // Down-sample and pin other interaction matrix - val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() - - // Compute & broadcast the number of interactions per thing in B - val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - - // Compute cross-co-occurrence matrix A'B - val drmAtB = drmA.t %*% drmB - - val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerItemA, bcastInteractionsPerThingB) - - similarityMatrices = similarityMatrices :+ drmSimilarityAtB - - drmB.uncache() - } - - // Unpin downsampled interaction matrix - drmA.uncache() - - // Return list of similarity matrices - similarityMatrices - } - - /** - * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns - * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID - * dictionaries correctly - * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingItemsPerThing max similarities per items - * @param maxNumInteractions max number of input items per item - * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled - * IndexedDatasets for cooccurrence and cross-cooccurrence - */ - def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset], - randomSeed: Int = 0xdeadbeef, - maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500): - List[IndexedDataset] = { - val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]]) - val primaryDrm = drms(0) - val secondaryDrms = drms.drop(1) - val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing, - maxNumInteractions, secondaryDrms) - val retIDSs = coocMatrices.iterator.zipWithIndex.map { - case( drm, i ) => - indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs) - } - retIDSs.toList - } - - /** - * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - */ - def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, - maxNumInteractions: Int = 500): DrmLike[Int] = { - - implicit val distributedContext = drmARaw.context - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num columns, which equals the maximum number of interactions per item - val numCols = drmA.ncol - - // Compute & broadcast the number of interactions per row in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow) - - // Compute row similarity cooccurrence matrix AA' - val drmAAt = drmA %*% drmA.t - - // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, - bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) - - drmSimilaritiesAAt - } - - /** - * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. - * Uses IndexedDatasets, which handle external ID dictionaries properly - * @param indexedDataset compare each row to every other - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingSimilaritiesPerRow max elements returned in each row - * @param maxObservationsPerRow max number of input elements to use - */ - def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef, - maxInterestingSimilaritiesPerRow: Int = 50, - maxObservationsPerRow: Int = 500): - IndexedDataset = { - val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow, - maxObservationsPerRow) - indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs) - } - - /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */ - def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, - numInteractionsWithAandB: Long, numInteractions: Long) = { - - val k11 = numInteractionsWithAandB - val k12 = numInteractionsWithA - numInteractionsWithAandB - val k21 = numInteractionsWithB - numInteractionsWithAandB - val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB - - LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) - - } - - def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, - bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], - crossCooccurrence: Boolean = true) = { - drm.mapBlock() { - case (keys, block) => - - val llrBlock = block.like() - val numInteractionsB: Vector = bcastNumInteractionsB - val numInteractionsA: Vector = bcastNumInteractionsA - - for (index <- 0 until keys.size) { - - val thingB = keys(index) - - // PriorityQueue to select the top-k items - val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) - - block(index, ::).nonZeroes().foreach { elem => - val thingA = elem.index - val cooccurrences = elem.get - - // exclude co-occurrences of the item with itself - if (crossCooccurrence || thingB != thingA) { - // Compute loglikelihood ratio - val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, - cooccurrences.toLong, numUsers) - - val candidate = thingA -> llr - - // legacy hadoop code maps values to range (0..1) via - // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr)) - // val candidate = thingA -> normailizedLLR - - // Enqueue item with score, if belonging to the top-k - if (topItemsPerThing.size < maxInterestingItemsPerThing) { - topItemsPerThing.enqueue(candidate) - } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { - topItemsPerThing.dequeue() - topItemsPerThing.enqueue(candidate) - } - } - } - - // Add top-k interesting items to the output matrix - topItemsPerThing.dequeueAll.foreach { - case (otherThing, llrScore) => - llrBlock(index, otherThing) = llrScore - } - } - - keys -> llrBlock - } - } - - /** - * Selectively downsample rows and items with an anomalous amount of interactions, inspired by - * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java - * - * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not - * @param drmM matrix to downsample - * @param seed random number generator seed, keep to a constant if repeatability is neccessary - * @param maxNumInteractions number of elements in a row of the returned matrix - * @return the downsampled DRM - */ - def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { - - implicit val distributedContext = drmM.context - - // Pin raw interaction matrix - val drmI = drmM.checkpoint() - - // Broadcast vector containing the number of interactions with each thing - val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) - - val downSampledDrmI = drmI.mapBlock() { - case (keys, block) => - val numInteractions: Vector = bcastNumInteractions - - // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of - //failures - val random = new Random(MurmurHash.hash(keys(0), seed)) - - val downsampledBlock = block.like() - - // Downsample the interaction vector of each row - for (rowIndex <- 0 until keys.size) { - - val interactionsInRow = block(rowIndex, ::) - - val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements() - - val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow - - interactionsInRow.nonZeroes().foreach { elem => - val numInteractionsWithThing = numInteractions(elem.index) - val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing - - if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) { - // We ignore the original interaction value and create a binary 0-1 matrix - // as we only consider whether interactions happened or did not happen - downsampledBlock(rowIndex, elem.index) = 1 - } - } - } - - keys -> downsampledBlock - } - - // Unpin raw interaction matrix - drmI.uncache() - - downSampledDrmI - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala deleted file mode 100644 index 4e2f45c..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import scala.reflect.ClassTag -import org.apache.mahout.math._ -import drm._ -import scalabindings._ -import RLikeDrmOps._ -import RLikeOps._ -import scala.util.Random -import org.apache.log4j.Logger -import math._ -import org.apache.mahout.common.RandomUtils - -/** Simple ALS factorization algotithm. To solve, use train() method. */ -private[math] object ALS { - - private val log = Logger.getLogger(ALS.getClass) - - /** - * ALS training result. <P> - * - * <code>drmU %*% drmV.t</code> is supposed to approximate the input. - * - * @param drmU U matrix - * @param drmV V matrix - * @param iterationsRMSE RMSE values afeter each of iteration performed - */ - class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) { - def toTuple = (drmU, drmV, iterationsRMSE) - } - - /** Result class for in-core results */ - class InCoreResult(val inCoreU: Matrix, inCoreV: Matrix, val iterationsRMSE: Iterable[Double]) { - def toTuple = (inCoreU, inCoreV, iterationsRMSE) - } - - /** - * Run Distributed ALS. - * <P> - * - * Example: - * - * <pre> - * val (u,v,errors) = als(input, k).toTuple - * </pre> - * - * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, - * whichever earlier. - * <P> - * - * @param drmA The input matrix - * @param k required rank of decomposition (number of cols in U and V results) - * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this - * value. If <=0 then we won't compute RMSE and use convergence test. - * @param lambda regularization rate - * @param maxIterations maximum iterations to run regardless of convergence - * @tparam K row key type of the input (100 is probably more than enough) - * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} - */ - def dals[K: ClassTag]( - drmA: DrmLike[K], - k: Int = 50, - lambda: Double = 0.0, - maxIterations: Int = 10, - convergenceThreshold: Double = 0.10 - ): Result[K] = { - - assert(convergenceThreshold < 1.0, "convergenceThreshold") - assert(maxIterations >= 1, "maxIterations") - - val drmAt = drmA.t - - // Initialize U and V so that they are identically distributed to A or A' - var drmU = drmA.mapBlock(ncol = k) { - case (keys, block) => - val rnd = RandomUtils.getRandom() - val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01 - keys -> uBlock - } - - var drmV: DrmLike[Int] = null - var rmseIterations: List[Double] = Nil - - // ALS iterator - var stop = false - var i = 0 - while (!stop && i < maxIterations) { - - // Alternate. This is really what ALS is. - if (drmV != null) drmV.uncache() - drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint() - - drmU.uncache() - drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint() - - // Check if we are requested to do a convergence test; and do it if yes. - if (convergenceThreshold > 0) { - - val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow) - - if (i > 0) { - val rmsePrev = rmseIterations.last - val convergence = (rmsePrev - rmse) / rmsePrev - - if (convergence < 0) { - log.warn("Rmse increase of %f. Should not happen.".format(convergence)) - // I guess error growth can happen in ideal data case? - stop = true - } else if (convergence < convergenceThreshold) { - stop = true - } - } - rmseIterations :+= rmse - } - - i += 1 - } - - new Result(drmU, drmV, rmseIterations) - } - - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala deleted file mode 100644 index 7caa3dd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import scala.reflect.ClassTag -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.log4j.Logger - -object DQR { - - private val log = Logger.getLogger(DQR.getClass) - - /** - * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty - * controlled (<5000 or so). <P> - * - * It is recommended to checkpoint A since it does two passes over it. <P> - * - * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so - * their RDD should be able to zip successfully. - */ - def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { - - if (drmA.ncol > 5000) - log.warn("A is too fat. A'A must fit in memory and easily broadcasted.") - - implicit val ctx = drmA.context - - val AtA = (drmA.t %*% drmA).checkpoint() - val inCoreAtA = AtA.collect - - if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA)) - - val ch = chol(inCoreAtA) - val inCoreR = (ch.getL cloned) t - - if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR)) - - if (checkRankDeficiency && !ch.isPositiveDefinite) - throw new IllegalArgumentException("R is rank-deficient.") - - val bcastAtA = drmBroadcast(inCoreAtA) - - // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re- - // decompose A'A in the backend again. - - // Compute Q = A*inv(L') -- we can do it blockwise. - val Q = drmA.mapBlock() { - case (keys, block) => keys -> chol(bcastAtA).solveRight(block) - } - - Q -> inCoreR - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala deleted file mode 100644 index de7402d..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import scala.reflect.ClassTag -import org.apache.mahout.math.{Matrices, Vector} -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.mahout.common.RandomUtils - -object DSPCA { - - /** - * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" - * document of the MAHOUT-817. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations (hint: use either 0 or 1) - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = { - - val drmAcp = drmA.checkpoint() - implicit val ctx = drmAcp.context - - val m = drmAcp.nrow - val n = drmAcp.ncol - assert(k <= (m min n), "k cannot be greater than smaller of m, n.") - val pfxed = safeToNonNegInt((m min n) - k min p) - - // Actual decomposition rank - val r = k + pfxed - - // Dataset mean - val xi = drmAcp.colMeans - - // We represent Omega by its seed. - val omegaSeed = RandomUtils.getRandom().nextInt() - val omega = Matrices.symmetricUniformView(n, r, omegaSeed) - - // This done in front in a single-threaded fashion for now. Even though it doesn't require any - // memory beyond that is required to keep xi around, it still might be parallelized to backs - // for significantly big n and r. TODO - val s_o = omega.t %*% xi - - val bcastS_o = drmBroadcast(s_o) - val bcastXi = drmBroadcast(xi) - - var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) => - val s_o:Vector = bcastS_o - val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o - keys -> blockY - } - // Checkpoint Y - .checkpoint() - - var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() - - var s_q = drmQ.colSums() - var bcastVarS_q = drmBroadcast(s_q) - - // This actually should be optimized as identically partitioned map-side A'B since A and Q should - // still be identically partitioned. - var drmBt = (drmAcp.t %*% drmQ).checkpoint() - - var s_b = (drmBt.t %*% xi).collect(::, 0) - var bcastVarS_b = drmBroadcast(s_b) - - for (i <- 0 until q) { - - // These closures don't seem to live well with outside-scope vars. This doesn't record closure - // attributes correctly. So we create additional set of vals for broadcast vars to properly - // create readonly closure attributes in this very scope. - val bcastS_q = bcastVarS_q - val bcastS_b = bcastVarS_b - val bcastXib = bcastXi - - // Fix Bt as B' -= xi cross s_q - drmBt = drmBt.mapBlock() { - case (keys, block) => - val s_q: Vector = bcastS_q - val xi: Vector = bcastXib - keys.zipWithIndex.foreach { - case (key, idx) => block(idx, ::) -= s_q * xi(key) - } - keys -> block - } - - drmY.uncache() - drmQ.uncache() - - drmY = (drmAcp %*% drmBt) - // Fix Y by subtracting s_b from each row of the AB' - .mapBlock() { - case (keys, block) => - val s_b: Vector = bcastS_b - for (row <- 0 until block.nrow) block(row, ::) -= s_b - keys -> block - } - // Checkpoint Y - .checkpoint() - - drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() - - s_q = drmQ.colSums() - bcastVarS_q = drmBroadcast(s_q) - - // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not - // identically partitioned anymore. - drmBt = (drmAcp.t %*% drmQ).checkpoint() - - s_b = (drmBt.t %*% xi).collect(::, 0) - bcastVarS_b = drmBroadcast(s_b) - } - - val c = s_q cross s_b - val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect - - c - c.t + (s_q cross s_q) * (xi dot xi) - val (inCoreUHat, d) = eigen(inCoreBBt) - val s = d.sqrt - - // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags - // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? - val drmU = drmQ %*% inCoreUHat - val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) - - (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala deleted file mode 100644 index 1abfb87..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala +++ /dev/null @@ -1,82 +0,0 @@ -package org.apache.mahout.math.decompositions - -import scala.reflect.ClassTag -import org.apache.mahout.math.{Matrix, Matrices, Vector} -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.mahout.common.RandomUtils - -object DSSVD { - - /** - * Distributed Stochastic Singular Value decomposition algorithm. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = { - - val drmAcp = drmA.checkpoint() - - val m = drmAcp.nrow - val n = drmAcp.ncol - assert(k <= (m min n), "k cannot be greater than smaller of m, n.") - val pfxed = safeToNonNegInt((m min n) - k min p) - - // Actual decomposition rank - val r = k + pfxed - - // We represent Omega by its seed. - val omegaSeed = RandomUtils.getRandom().nextInt() - - // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and - // instantiate the Omega random matrix view in the backend instead. That way serialized closure - // is much more compact. - var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) => - val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - keys -> blockY - } - - var drmQ = dqrThin(drmY.checkpoint())._1 - // Checkpoint Q if last iteration - if (q == 0) drmQ = drmQ.checkpoint() - - // This actually should be optimized as identically partitioned map-side A'B since A and Q should - // still be identically partitioned. - var drmBt = drmAcp.t %*% drmQ - // Checkpoint B' if last iteration - if (q == 0) drmBt = drmBt.checkpoint() - - for (i <- 0 until q) { - drmY = drmAcp %*% drmBt - drmQ = dqrThin(drmY.checkpoint())._1 - // Checkpoint Q if last iteration - if (i == q - 1) drmQ = drmQ.checkpoint() - - // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not - // identically partitioned anymore. - drmBt = drmAcp.t %*% drmQ - // Checkpoint B' if last iteration - if (i == q - 1) drmBt = drmBt.checkpoint() - } - - val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt) - val s = d.sqrt - - // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags - // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? - val drmU = drmQ %*% inCoreUHat - val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) - - (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala deleted file mode 100644 index 80385a3..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import scala.math._ -import org.apache.mahout.math.{Matrices, Matrix} -import org.apache.mahout.common.RandomUtils -import org.apache.log4j.Logger -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ - -private[math] object SSVD { - - private val log = Logger.getLogger(SSVD.getClass) - - /** - * In-core SSVD algorithm. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = { - val m = a.nrow - val n = a.ncol - if (k > min(m, n)) - throw new IllegalArgumentException( - "k cannot be greater than smaller of m,n") - val pfxed = min(p, min(m, n) - k) - - // Actual decomposition rank - val r = k + pfxed - - val rnd = RandomUtils.getRandom - val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) - - var y = a %*% omega - var yty = y.t %*% y - val at = a.t - var ch = chol(yty) - assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") - var bt = ch.solveRight(at %*% y) - - // Power iterations - for (i <- 0 until q) { - y = a %*% bt - yty = y.t %*% y - ch = chol(yty) - bt = ch.solveRight(at %*% y) - } - - val bbt = bt.t %*% bt - val (uhat, d) = eigen(bbt) - - val s = d.sqrt - val u = ch.solveRight(y) %*% uhat - val v = bt %*% (uhat %*%: diagv(1 /: s)) - - (u(::, 0 until k), v(::, 0 until k), s(0 until k)) - } - - /** - * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This - * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed - * to save some memory for sparse inputs by removing direct mean subtraction.<P> - * - * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. - * If retaining distances and orignal scaled variances not that important, the normalized PCA space - * is just U. - * - * Important: data points are considered to be rows. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = { - val m = a.nrow - val n = a.ncol - if (k > min(m, n)) - throw new IllegalArgumentException( - "k cannot be greater than smaller of m,n") - val pfxed = min(p, min(m, n) - k) - - // Actual decomposition rank - val r = k + pfxed - - val rnd = RandomUtils.getRandom - val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) - - // Dataset mean - val xi = a.colMeans() - - if (log.isDebugEnabled) log.debug("xi=%s".format(xi)) - - var y = a %*% omega - - // Fixing y - val s_o = omega.t %*% xi - y := ((r,c,v) => v - s_o(c)) - - var yty = y.t %*% y - var ch = chol(yty) -// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") - - // This is implicit Q of QR(Y) - var qm = ch.solveRight(y) - var bt = a.t %*% qm - var s_q = qm.colSums() - var s_b = bt.t %*% xi - - // Power iterations - for (i <- 0 until q) { - - // Fix bt - bt -= xi cross s_q - - y = a %*% bt - - // Fix Y again. - y := ((r,c,v) => v - s_b(c)) - - yty = y.t %*% y - ch = chol(yty) - qm = ch.solveRight(y) - bt = a.t %*% qm - s_q = qm.colSums() - s_b = bt.t %*% xi - } - - val c = s_q cross s_b - - // BB' computation becomes - val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi) - - val (uhat, d) = eigen(bbt) - - val s = d.sqrt - val u = qm %*% uhat - val v = bt %*% (uhat %*%: diagv(1 /: s)) - - (u(::, 0 until k), v(::, 0 until k), s(0 until k)) - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala deleted file mode 100644 index a7b829f..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike - -/** - * This package holds all decomposition and factorization-like methods, all that we were able to make - * distributed engine-independent so far, anyway. - */ -package object decompositions { - - // ================ In-core decompositions =================== - - /** - * In-core SSVD algorithm. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q) - - /** - * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This - * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed - * to save some memory for sparse inputs by removing direct mean subtraction.<P> - * - * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. - * If retaining distances and orignal scaled variances not that important, the normalized PCA space - * is just U. - * - * Important: data points are considered to be rows. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = - SSVD.spca(a = a, k = k, p = p, q = q) - - // ============== Distributed decompositions =================== - - /** - * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty - * controlled (<5000 or so). <P> - * - * It is recommended to checkpoint A since it does two passes over it. <P> - * - * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so - * their RDD should be able to zip successfully. - */ - def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = - DQR.dqrThin(drmA, checkRankDeficiency) - - /** - * Distributed Stochastic Singular Value decomposition algorithm. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q) - - /** - * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" - * document of the MAHOUT-817. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations (hint: use either 0 or 1) - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q) - - /** Result for distributed ALS-type two-component factorization algorithms */ - type FactorizationResult[K] = ALS.Result[K] - - /** Result for distributed ALS-type two-component factorization algorithms, in-core matrices */ - type FactorizationResultInCore = ALS.InCoreResult - - /** - * Run ALS. - * <P> - * - * Example: - * - * <pre> - * val (u,v,errors) = als(input, k).toTuple - * </pre> - * - * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, - * whichever earlier. - * <P> - * - * @param drmA The input matrix - * @param k required rank of decomposition (number of cols in U and V results) - * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this - * value. If <=0 then we won't compute RMSE and use convergence test. - * @param lambda regularization rate - * @param maxIterations maximum iterations to run regardless of convergence - * @tparam K row key type of the input (100 is probably more than enough) - * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} - */ - def dals[K: ClassTag]( - drmA: DrmLike[K], - k: Int = 50, - lambda: Double = 0.0, - maxIterations: Int = 10, - convergenceThreshold: Double = 0.10 - ): FactorizationResult[K] = - ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold) - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala deleted file mode 100644 index 850614457..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -/** Broadcast variable abstraction */ -trait BCast[T] { - def value:T -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala deleted file mode 100644 index ac763f9..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.mahout.math.drm - -object CacheHint extends Enumeration { - - type CacheHint = Value - - val NONE, - DISK_ONLY, - DISK_ONLY_2, - MEMORY_ONLY, - MEMORY_ONLY_2, - MEMORY_ONLY_SER, - MEMORY_ONLY_SER_2, - MEMORY_AND_DISK, - MEMORY_AND_DISK_2, - MEMORY_AND_DISK_SER, - MEMORY_AND_DISK_SER_2 = Value - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala deleted file mode 100644 index 7f97481..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import org.apache.mahout.math.Matrix -import scala.reflect.ClassTag - -/** - * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be - * therefore collected or saved. - * @tparam K matrix key type (e.g. the keys of sequence files once persisted) - */ -trait CheckpointedDrm[K] extends DrmLike[K] { - - def collect: Matrix - - def dfsWrite(path: String) - - /** If this checkpoint is already declared cached, uncache. */ - def uncache(): this.type - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - def keyClassTag: ClassTag[K] - - - /** changes the number of rows without touching the underlying data */ - def newRowCardinality(n: Int): CheckpointedDrm[K] - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala deleted file mode 100644 index 8c3911f..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag -import org.apache.mahout.math._ - - -/** - * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to - * the DRMBase once they stabilize. - * - */ -class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) { - - - /** Column sums. At this point this runs on checkpoint and collects in-core vector. */ - def colSums(): Vector = drm.context.colSums(drm) - - /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */ - def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm) - - /** Column Means */ - def colMeans(): Vector = drm.context.colMeans(drm) - - def norm():Double = drm.context.norm(drm) -} - http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala deleted file mode 100644 index 39bab90..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import java.io.Closeable - -/** Distributed context (a.k.a. distributed session handle) */ -trait DistributedContext extends Closeable { - - val engine:DistributedEngine - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala deleted file mode 100644 index dd5b101..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import com.google.common.collect.{HashBiMap, BiMap} -import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset} - -import scala.reflect.ClassTag -import logical._ -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import RLikeDrmOps._ -import DistributedEngine._ -import org.apache.mahout.math.scalabindings._ -import org.apache.log4j.Logger - -/** Abstraction of optimizer/distributed engine */ -trait DistributedEngine { - - /** - * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may - * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce - * per se. - * <P> - * - * A particular physical engine implementation may choose to either use the default rewrites or - * build its own rewriting rules. - * <P> - */ - def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action))) - - /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */ - def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] - - /** Engine-specific colSums implementation based on a checkpoint. */ - def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector - - /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector - - /** Engine-specific colMeans implementation based on a checkpoint. */ - def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector - - def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double - - /** Broadcast support */ - def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] - - /** Broadcast support */ - def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] - - /** - * Load DRM from hdfs (as in Mahout DRM format). - * <P/> - * @param path The DFS path to load from - * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). - */ - def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] - - /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ - def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[Int] - - /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ - def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[String] - - /** This creates an empty DRM with specified number of partitions and cardinality. */ - def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Int] - - /** Creates empty DRM with non-trivial height */ - def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Long] - /** - * Load IndexedDataset from text delimited format. - * @param src comma delimited URIs to read from - * @param schema defines format of file(s) - */ - def indexedDatasetDFSRead(src: String, - schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) - (implicit sc: DistributedContext): - IndexedDataset - - /** - * Load IndexedDataset from text delimited format, one element per line - * @param src comma delimited URIs to read from - * @param schema defines format of file(s) - */ - def indexedDatasetDFSReadElements(src: String, - schema: Schema = DefaultIndexedDatasetElementReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) - (implicit sc: DistributedContext): - IndexedDataset - -} - -object DistributedEngine { - - private val log = Logger.getLogger(DistributedEngine.getClass) - - /** This is mostly multiplication operations rewrites */ - private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { - - action match { - case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a)) - case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a)) - - // For now, rewrite left-multiply via transpositions, i.e. - // inCoreA %*% B = (B' %*% inCoreA')' - case op@OpTimesLeftMatrix(a, b) => - OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) - - // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments - case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) => - - // Make sure closure sees only local vals, not attributes. We need to do these ugly casts - // around because compiler could not infer that K is the same as Int, based on if() above. - val ma = safeToNonNegInt(a.nrow) - val bAdjusted = new OpMapBlock[Int, Int]( - A = pass1(b.asInstanceOf[DrmLike[Int]]), - bmf = { - case (keys, block) => keys.map(_ + ma) -> block - }, - identicallyPartitioned = false - ) - val aAdjusted = a.asInstanceOf[DrmLike[Int]] - OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]] - - // Stop at checkpoints - case cd: CheckpointedDrm[_] => action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => - uop.A = pass1(uop.A)(uop.classTagA) - uop - case bop: AbstractBinaryOp[_, _, K] => - bop.A = pass1(bop.A)(bop.classTagA) - bop.B = pass1(bop.B)(bop.classTagB) - bop - } - } - - /** This would remove stuff like A.t.t that previous step may have created */ - private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { - action match { - // A.t.t => A - case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA) - - // Stop at checkpoints - case cd: CheckpointedDrm[_] => action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => - uop.A = pass2(uop.A)(uop.classTagA) - uop - case bop: AbstractBinaryOp[_, _, K] => - bop.A = pass2(bop.A)(bop.classTagA) - bop.B = pass2(bop.B)(bop.classTagB) - bop - } - } - - /** Some further rewrites that are conditioned on A.t.t removal */ - private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { - action match { - - // matrix products. - case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b)) - - // AtB cases that make sense. - case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b)) - case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b)) - - // Need some cost to choose between the following. - - case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b)) - // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a))) - case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b))) - - // Rewrite A'x - case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x) - - // Stop at checkpoints - case cd: CheckpointedDrm[_] => action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => - uop.A = pass3(uop.A)(uop.classTagA) - uop - case bop: AbstractBinaryOp[_, _, K] => - bop.A = pass3(bop.A)(bop.classTagA) - bop.B = pass3(bop.B)(bop.classTagB) - bop - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala deleted file mode 100644 index e5cf563..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import RLikeDrmOps._ -import scala.reflect.ClassTag - -class DrmDoubleScalarOps(val x:Double) extends AnyVal{ - - def +[K:ClassTag](that:DrmLike[K]) = that + x - - def *[K:ClassTag](that:DrmLike[K]) = that * x - - def -[K:ClassTag](that:DrmLike[K]) = x -: that - - def /[K:ClassTag](that:DrmLike[K]) = x /: that - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala deleted file mode 100644 index b9c50b0..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag - - -/** - * - * Basic spark DRM trait. - * - * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in - * this package. Spark backing is already implied. - * - */ -trait DrmLike[K] { - - protected[mahout] def partitioningTag: Long - - protected[mahout] def canHaveMissingRows: Boolean - - /** - * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm. - * DistributedEngine]]. - */ - val context:DistributedContext - - /** R-like syntax for number of rows. */ - def nrow: Long - - /** R-like syntax for number of columns */ - def ncol: Int - - /** - * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer - * and writing down Spark graph lineage since last checkpointed DRM. - */ - def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K] - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala deleted file mode 100644 index bc937d6..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange} - -/** Common Drm ops */ -class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { - - /** - * Parallelism adjustments. <P/> - * - * Change only one of parameters from default value to choose new parallelism adjustment strategy. - * <P/> - * - * E.g. use - * <pre> - * drmA.par(auto = true) - * </pre> - * to use automatic parallelism adjustment. - * <P/> - * - * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for - * a particular backend strategy. However, it is usually equivalent to number of map tasks or data - * splits. - * <P/> - * - * @param min If changed from default, ensures the product has at least that much parallelism. - * @param exact if changed from default, ensures the pipeline product has exactly that much - * parallelism. - * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy - * is applied. - */ - def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = { - assert(min >= 0 || exact >= 0 || auto, "Invalid argument") - OpPar(drm, minSplits = min, exactSplits = exact) - } - - /** - * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block - * matrices; or they could be completely new matrices with new keyset. In the latter case, output - * matrix width must be specified with <code>ncol</code> parameter.<P> - * - * New block heights must be of the same height as the original geometry.<P> - * - * @param ncol new matrix' width (only needed if width changes). - * @param bmf - * @tparam R - * @return - */ - def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true) - (bmf: BlockMapFunc[K, R]): DrmLike[R] = - new OpMapBlock[K, R]( - A = drm, - bmf = bmf, - _ncol = ncol, - identicallyPartitioned = identicallyParitioned - ) - - - /** - * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P> - * - * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P> - * - * Row range is currently unsupported except for the all-range. When it will be fully supported, - * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications. - * - * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key. - * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range. - */ - def apply(rowRange: Range, colRange: Range): DrmLike[K] = { - - import RLikeDrmOps._ - import RLikeOps._ - - val rowSrc: DrmLike[K] = if (rowRange != ::) { - - if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) { - - assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range") - val intKeyed = drm.asInstanceOf[DrmLike[Int]] - - new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]] - - } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.") - - } else drm - - if (colRange != ::) { - - assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range") - - // Use mapBlock operator to do in-core subranging. - rowSrc.mapBlock(ncol = colRange.length)({ - case (keys, block) => keys -> block(::, colRange) - }) - - } else rowSrc - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala deleted file mode 100644 index 380f4eb..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag -import org.apache.mahout.math.{Vector, Matrix} -import org.apache.mahout.math.drm.logical._ - -class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { - - import RLikeDrmOps._ - - def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+") - - def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-") - - def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*") - - def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/") - - def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") - - def +:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") - - def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-") - - def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:") - - def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") - - def *:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") - - def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/") - - def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:") - - def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that) - - def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that) - - def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that - - def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that) - - def %*%(that: Matrix): DrmLike[K] = this :%*% that - - def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that) - - def %*%(that: Vector): DrmLike[K] = :%*%(that) - - def t: DrmLike[Int] = OpAtAnyKey(A = drm) - - def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that) - - def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that) -} - -class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { - - import org.apache.mahout.math._ - import scalabindings._ - import RLikeOps._ - import RLikeDrmOps._ - import scala.collection.JavaConversions._ - - override def t: DrmLike[Int] = OpAt(A = drm) - - def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm) - - def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm) - - /** Row sums. This is of course applicable to Int-keyed distributed matrices only. */ - def rowSums(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise rowsums and output them as one-column matrix. - keys -> dense(block.rowSums).t - } - .collect(::, 0) - } - - /** Counts the non-zeros elements in each row returning a vector of the counts */ - def numNonZeroElementsPerRow(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise row non-zero counts and output them as a one-column matrix. - keys -> dense(block.numNonZeroElementsPerRow).t - } - .collect(::, 0) - } - - /** Row means */ - def rowMeans(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise row means and output them as one-column matrix. - keys -> dense(block.rowMeans).t - } - .collect(::, 0) - } - - /** Return diagonal vector */ - def diagv: Vector = { - require(drm.ncol == drm.nrow, "Must be square to extract diagonal") - drm.mapBlock(ncol = 1) { case (keys, block) => - keys -> dense(for (r <- block.view) yield r(keys(r.index))).t - } - .collect(::, 0) - } - -} - -object RLikeDrmOps { - - implicit def double2ScalarOps(x:Double) = new DrmDoubleScalarOps(x) - - implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm) - - implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm) - - implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm - - implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm - - // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files - // Not sure why. - // implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp) - - /** - * This is probably dangerous since it triggers implicit checkpointing with default storage level - * setting. - */ - implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint()) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala deleted file mode 100644 index 3b6b8bf..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.{DistributedContext, DrmLike} - -/** - * Any logical binary operator (such as A + B). - * <P/> - * - * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence, - * it also inherits CheckpointAction. - * <P/> - * - * @param evidence$1 LHS key type tag - * @param evidence$2 RHS key type tag - * @param evidence$3 expression key type tag - * @tparam A LHS key type - * @tparam B RHS key type - * @tparam K result key type - */ -abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] - extends CheckpointAction[K] with DrmLike[K] { - - protected[drm] var A: DrmLike[A] - protected[drm] var B: DrmLike[B] - lazy val context: DistributedContext = A.context - - protected[mahout] def canHaveMissingRows: Boolean = false - - // These are explicit evidence export. Sometimes scala falls over to figure that on its own. - def classTagA: ClassTag[A] = implicitly[ClassTag[A]] - - def classTagB: ClassTag[B] = implicitly[ClassTag[B]] - - def classTagK: ClassTag[K] = implicitly[ClassTag[K]] - -}
