http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala new file mode 100644 index 0000000..32515f1 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.mahout.math.drm.DistributedContext + +/** Extended by a platform specific version of this class to create a Mahout CLI driver. */ +abstract class MahoutDriver { + + implicit protected var mc: DistributedContext = _ + implicit protected var parser: MahoutOptionParser = _ + + var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite + + /** must be overriden to setup the DistributedContext mc*/ + protected def start() : Unit + + /** Override (optionally) for special cleanup */ + protected def stop(): Unit = { + if (!_useExistingContext) mc.close + } + + /** This is where you do the work, call start first, then before exiting call stop */ + protected def process(): Unit + + /** Parse command line and call process */ + def main(args: Array[String]): Unit + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala new file mode 100644 index 0000000..3b5affd --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.drivers + +import scopt.OptionParser + +import scala.collection.immutable + +/** + * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to + * keep both standarized. + * @param programName Name displayed in help message, the name by which the driver is invoked. + * @note options are engine neutral by convention. See the engine specific extending class for + * to add Spark or other engine options. + */ +class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { + + // build options from some stardard CLI param groups + // Note: always put the driver specific options at the last so they can override any previous options! + var opts = Map.empty[String, Any] + + override def showUsageOnError = true + + def parseIOOptions(numInputs: Int = 1) = { + opts = opts ++ MahoutOptionParser.FileIOOptions + note("Input, output options") + opt[String]('i', "input") required() action { (x, options) => + options + ("input" -> x) + } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" + + " (required)") + + if (numInputs == 2) { + opt[String]("input2") abbr ("i2") action { (x, options) => + options + ("input2" -> x) + } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " + + "(optional). Default: empty.") + } + + opt[String]('o', "output") required() action { (x, options) => + if (x.endsWith("/")) { + options + ("output" -> x) + } else { + options + ("output" -> (x + "/")) + } + } text ("Path for output directory, any HDFS supported URI (required)") + + } + + def parseGenericOptions() = { + opts = opts ++ MahoutOptionParser.GenericOptions + opt[Int]("randomSeed") abbr ("rs") action { (x, options) => + options + ("randomSeed" -> x) + } validate { x => + if (x > 0) success else failure("Option --randomSeed must be > 0") + } + + //output both input IndexedDatasets + opt[Unit]("writeAllDatasets") hidden() action { (_, options) => + options + ("writeAllDatasets" -> true) + }//Hidden option, though a user might want this. + } + + def parseElementInputSchemaOptions() = { + //Input text file schema--not driver specific but input data specific, elements input, + // not rows of IndexedDatasets + opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions + note("\nInput text file schema options:") + opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { + (x, options) => + options + ("inDelim" -> x) + } + + opt[String]("filter1") abbr ("f1") action { (x, options) => + options + ("filter1" -> x) + } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " + + "Default: no filter, all data is used") + + opt[String]("filter2") abbr ("f2") action { (x, options) => + options + ("filter2" -> x) + } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " + + "If not present no secondary dataset is collected") + + opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) => + options + ("rowIDColumn" -> x) + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { + x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } + + opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) => + options + ("itemIDColumn" -> x) + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { + x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } + + opt[Int]("filterColumn") abbr ("fc") action { (x, options) => + options + ("filterColumn" -> x) + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " + + "filter") validate { x => + if (x >= -1) success else failure("Option --filterColNum must be >= -1") + } + + note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" + + " \"userID<tab>itemID<tab>any-text...\" and all rows will be used") + + //check for column consistency + checkConfig { options: Map[String, Any] => + if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int] + || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int] + || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]) + failure("The row, item, and filter positions must be unique.") else success + } + + //check for filter consistency + checkConfig { options: Map[String, Any] => + if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter2").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String]) + failure ("If using filters they must be unique.") else success + } + + } + + def parseFileDiscoveryOptions() = { + //File finding strategy--not driver specific + opts = opts ++ MahoutOptionParser.FileDiscoveryOptions + note("\nFile discovery options:") + opt[Unit]('r', "recursive") action { (_, options) => + options + ("recursive" -> true) + } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") + + opt[String]("filenamePattern") abbr ("fp") action { (x, options) => + options + ("filenamePattern" -> x) + } text ("Regex to match in determining input files (optional). Default: filename in the --input option " + + "or \"^part-.*\" if --input is a directory") + + } + + def parseIndexedDatasetFormatOptions() = { + opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions + note("\nOutput text file schema options:") + opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => + options + ("rowKeyDelim" -> x) + } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") + + opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => + options + ("columnIdStrengthDelim" -> x) + } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") + + opt[String]("elementDelim") abbr ("td") action { (x, options) => + options + ("elementDelim" -> x) + } text ("Separates vector element values in the values list (optional). Default: \" \"") + + opt[Unit]("omitStrength") abbr ("os") action { (_, options) => + options + ("omitStrength" -> true) + } text ("Do not write the strength to the output files (optional), Default: false.") + note("This option is used to output indexable data for creating a search engine recommender.") + + note("\nDefault delimiters will produce output of the form: " + + "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") + } + +} + +/** + * Companion object defines default option groups for reference in any driver that needs them. + * @note not all options are platform neutral so other platforms can add default options here if desired + */ +object MahoutOptionParser { + + // set up the various default option groups + final val GenericOptions = immutable.HashMap[String, Any]( + "randomSeed" -> System.currentTimeMillis().toInt, + "writeAllDatasets" -> false) + + final val SparkOptions = immutable.HashMap[String, Any]( + "master" -> "local", + "sparkExecutorMem" -> "", + "appName" -> "Generic Spark App, Change this.") + + final val FileIOOptions = immutable.HashMap[String, Any]( + "input" -> null.asInstanceOf[String], + "input2" -> null.asInstanceOf[String], + "output" -> null.asInstanceOf[String]) + + final val FileDiscoveryOptions = immutable.HashMap[String, Any]( + "recursive" -> false, + "filenamePattern" -> "^part-.*") + + final val TextDelimitedElementsOptions = immutable.HashMap[String, Any]( + "rowIDColumn" -> 0, + "itemIDColumn" -> 1, + "filterColumn" -> -1, + "filter1" -> null.asInstanceOf[String], + "filter2" -> null.asInstanceOf[String], + "inDelim" -> "[,\t ]") + + final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any]( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitStrength" -> false) +} + + http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala new file mode 100644 index 0000000..6557ab0 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.cf + +import org.apache.mahout.math._ +import org.apache.mahout.math.indexeddataset.IndexedDataset +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.math.function.{VectorFunction, Functions} + +import scala.util.Random + + +/** + * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object SimilarityAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + /** + * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... + * and returns a list of similarity and cross-similarity matrices + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and + * cross-cooccurrence + */ + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix + val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) + + var similarityMatrices = List(drmSimilarityAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) + + // Compute cross-co-occurrence matrix A'B + val drmAtB = drmA.t %*% drmB + + val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerItemA, bcastInteractionsPerThingB) + + similarityMatrices = similarityMatrices :+ drmSimilarityAtB + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of similarity matrices + similarityMatrices + } + + /** + * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns + * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID + * dictionaries correctly + * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingItemsPerThing max similarities per items + * @param maxNumInteractions max number of input items per item + * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled + * IndexedDatasets for cooccurrence and cross-cooccurrence + */ + def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset], + randomSeed: Int = 0xdeadbeef, + maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500): + List[IndexedDataset] = { + val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]]) + val primaryDrm = drms(0) + val secondaryDrms = drms.drop(1) + val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing, + maxNumInteractions, secondaryDrms) + val retIDSs = coocMatrices.iterator.zipWithIndex.map { + case( drm, i ) => + indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs) + } + retIDSs.toList + } + + /** + * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + */ + def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, + maxNumInteractions: Int = 500): DrmLike[Int] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num columns, which equals the maximum number of interactions per item + val numCols = drmA.ncol + + // Compute & broadcast the number of interactions per row in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow) + + // Compute row similarity cooccurrence matrix AA' + val drmAAt = drmA %*% drmA.t + + // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix + val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, + bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) + + drmSimilaritiesAAt + } + + /** + * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. + * Uses IndexedDatasets, which handle external ID dictionaries properly + * @param indexedDataset compare each row to every other + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingSimilaritiesPerRow max elements returned in each row + * @param maxObservationsPerRow max number of input elements to use + */ + def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef, + maxInterestingSimilaritiesPerRow: Int = 50, + maxObservationsPerRow: Int = 500): + IndexedDataset = { + val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow, + maxObservationsPerRow) + indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs) + } + + /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */ + def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + + } + + def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drm.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + + val candidate = thingA -> llr + + // legacy hadoop code maps values to range (0..1) via + // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr)) + // val candidate = thingA -> normailizedLLR + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample rows and items with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + * @param drmM matrix to downsample + * @param seed random number generator seed, keep to a constant if repeatability is neccessary + * @param maxNumInteractions number of elements in a row of the returned matrix + * @return the downsampled DRM + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of + //failures + val random = new Random(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each row + for (rowIndex <- 0 until keys.size) { + + val interactionsInRow = block(rowIndex, ::) + + val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements() + + val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow + + interactionsInRow.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(rowIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala new file mode 100644 index 0000000..4e2f45c --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import scala.reflect.ClassTag +import org.apache.mahout.math._ +import drm._ +import scalabindings._ +import RLikeDrmOps._ +import RLikeOps._ +import scala.util.Random +import org.apache.log4j.Logger +import math._ +import org.apache.mahout.common.RandomUtils + +/** Simple ALS factorization algotithm. To solve, use train() method. */ +private[math] object ALS { + + private val log = Logger.getLogger(ALS.getClass) + + /** + * ALS training result. <P> + * + * <code>drmU %*% drmV.t</code> is supposed to approximate the input. + * + * @param drmU U matrix + * @param drmV V matrix + * @param iterationsRMSE RMSE values afeter each of iteration performed + */ + class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) { + def toTuple = (drmU, drmV, iterationsRMSE) + } + + /** Result class for in-core results */ + class InCoreResult(val inCoreU: Matrix, inCoreV: Matrix, val iterationsRMSE: Iterable[Double]) { + def toTuple = (inCoreU, inCoreV, iterationsRMSE) + } + + /** + * Run Distributed ALS. + * <P> + * + * Example: + * + * <pre> + * val (u,v,errors) = als(input, k).toTuple + * </pre> + * + * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, + * whichever earlier. + * <P> + * + * @param drmA The input matrix + * @param k required rank of decomposition (number of cols in U and V results) + * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this + * value. If <=0 then we won't compute RMSE and use convergence test. + * @param lambda regularization rate + * @param maxIterations maximum iterations to run regardless of convergence + * @tparam K row key type of the input (100 is probably more than enough) + * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} + */ + def dals[K: ClassTag]( + drmA: DrmLike[K], + k: Int = 50, + lambda: Double = 0.0, + maxIterations: Int = 10, + convergenceThreshold: Double = 0.10 + ): Result[K] = { + + assert(convergenceThreshold < 1.0, "convergenceThreshold") + assert(maxIterations >= 1, "maxIterations") + + val drmAt = drmA.t + + // Initialize U and V so that they are identically distributed to A or A' + var drmU = drmA.mapBlock(ncol = k) { + case (keys, block) => + val rnd = RandomUtils.getRandom() + val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01 + keys -> uBlock + } + + var drmV: DrmLike[Int] = null + var rmseIterations: List[Double] = Nil + + // ALS iterator + var stop = false + var i = 0 + while (!stop && i < maxIterations) { + + // Alternate. This is really what ALS is. + if (drmV != null) drmV.uncache() + drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint() + + drmU.uncache() + drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint() + + // Check if we are requested to do a convergence test; and do it if yes. + if (convergenceThreshold > 0) { + + val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow) + + if (i > 0) { + val rmsePrev = rmseIterations.last + val convergence = (rmsePrev - rmse) / rmsePrev + + if (convergence < 0) { + log.warn("Rmse increase of %f. Should not happen.".format(convergence)) + // I guess error growth can happen in ideal data case? + stop = true + } else if (convergence < convergenceThreshold) { + stop = true + } + } + rmseIterations :+= rmse + } + + i += 1 + } + + new Result(drmU, drmV, rmseIterations) + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala new file mode 100644 index 0000000..7caa3dd --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import scala.reflect.ClassTag +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import org.apache.mahout.math.drm._ +import RLikeDrmOps._ +import org.apache.log4j.Logger + +object DQR { + + private val log = Logger.getLogger(DQR.getClass) + + /** + * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty + * controlled (<5000 or so). <P> + * + * It is recommended to checkpoint A since it does two passes over it. <P> + * + * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so + * their RDD should be able to zip successfully. + */ + def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { + + if (drmA.ncol > 5000) + log.warn("A is too fat. A'A must fit in memory and easily broadcasted.") + + implicit val ctx = drmA.context + + val AtA = (drmA.t %*% drmA).checkpoint() + val inCoreAtA = AtA.collect + + if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA)) + + val ch = chol(inCoreAtA) + val inCoreR = (ch.getL cloned) t + + if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR)) + + if (checkRankDeficiency && !ch.isPositiveDefinite) + throw new IllegalArgumentException("R is rank-deficient.") + + val bcastAtA = drmBroadcast(inCoreAtA) + + // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re- + // decompose A'A in the backend again. + + // Compute Q = A*inv(L') -- we can do it blockwise. + val Q = drmA.mapBlock() { + case (keys, block) => keys -> chol(bcastAtA).solveRight(block) + } + + Q -> inCoreR + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala new file mode 100644 index 0000000..de7402d --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import scala.reflect.ClassTag +import org.apache.mahout.math.{Matrices, Vector} +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import org.apache.mahout.math.drm._ +import RLikeDrmOps._ +import org.apache.mahout.common.RandomUtils + +object DSPCA { + + /** + * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" + * document of the MAHOUT-817. + * + * @param drmA input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations (hint: use either 0 or 1) + * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them + * e.g. save them to hdfs in order to trigger their computation. + */ + def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = { + + val drmAcp = drmA.checkpoint() + implicit val ctx = drmAcp.context + + val m = drmAcp.nrow + val n = drmAcp.ncol + assert(k <= (m min n), "k cannot be greater than smaller of m, n.") + val pfxed = safeToNonNegInt((m min n) - k min p) + + // Actual decomposition rank + val r = k + pfxed + + // Dataset mean + val xi = drmAcp.colMeans + + // We represent Omega by its seed. + val omegaSeed = RandomUtils.getRandom().nextInt() + val omega = Matrices.symmetricUniformView(n, r, omegaSeed) + + // This done in front in a single-threaded fashion for now. Even though it doesn't require any + // memory beyond that is required to keep xi around, it still might be parallelized to backs + // for significantly big n and r. TODO + val s_o = omega.t %*% xi + + val bcastS_o = drmBroadcast(s_o) + val bcastXi = drmBroadcast(xi) + + var drmY = drmAcp.mapBlock(ncol = r) { + case (keys, blockA) => + val s_o:Vector = bcastS_o + val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) + for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o + keys -> blockY + } + // Checkpoint Y + .checkpoint() + + var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + + var s_q = drmQ.colSums() + var bcastVarS_q = drmBroadcast(s_q) + + // This actually should be optimized as identically partitioned map-side A'B since A and Q should + // still be identically partitioned. + var drmBt = (drmAcp.t %*% drmQ).checkpoint() + + var s_b = (drmBt.t %*% xi).collect(::, 0) + var bcastVarS_b = drmBroadcast(s_b) + + for (i <- 0 until q) { + + // These closures don't seem to live well with outside-scope vars. This doesn't record closure + // attributes correctly. So we create additional set of vals for broadcast vars to properly + // create readonly closure attributes in this very scope. + val bcastS_q = bcastVarS_q + val bcastS_b = bcastVarS_b + val bcastXib = bcastXi + + // Fix Bt as B' -= xi cross s_q + drmBt = drmBt.mapBlock() { + case (keys, block) => + val s_q: Vector = bcastS_q + val xi: Vector = bcastXib + keys.zipWithIndex.foreach { + case (key, idx) => block(idx, ::) -= s_q * xi(key) + } + keys -> block + } + + drmY.uncache() + drmQ.uncache() + + drmY = (drmAcp %*% drmBt) + // Fix Y by subtracting s_b from each row of the AB' + .mapBlock() { + case (keys, block) => + val s_b: Vector = bcastS_b + for (row <- 0 until block.nrow) block(row, ::) -= s_b + keys -> block + } + // Checkpoint Y + .checkpoint() + + drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + + s_q = drmQ.colSums() + bcastVarS_q = drmBroadcast(s_q) + + // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not + // identically partitioned anymore. + drmBt = (drmAcp.t %*% drmQ).checkpoint() + + s_b = (drmBt.t %*% xi).collect(::, 0) + bcastVarS_b = drmBroadcast(s_b) + } + + val c = s_q cross s_b + val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect - + c - c.t + (s_q cross s_q) * (xi dot xi) + val (inCoreUHat, d) = eigen(inCoreBBt) + val s = d.sqrt + + // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags + // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? + val drmU = drmQ %*% inCoreUHat + val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) + + (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala new file mode 100644 index 0000000..1abfb87 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala @@ -0,0 +1,82 @@ +package org.apache.mahout.math.decompositions + +import scala.reflect.ClassTag +import org.apache.mahout.math.{Matrix, Matrices, Vector} +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import org.apache.mahout.math.drm._ +import RLikeDrmOps._ +import org.apache.mahout.common.RandomUtils + +object DSSVD { + + /** + * Distributed Stochastic Singular Value decomposition algorithm. + * + * @param drmA input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them + * e.g. save them to hdfs in order to trigger their computation. + */ + def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = { + + val drmAcp = drmA.checkpoint() + + val m = drmAcp.nrow + val n = drmAcp.ncol + assert(k <= (m min n), "k cannot be greater than smaller of m, n.") + val pfxed = safeToNonNegInt((m min n) - k min p) + + // Actual decomposition rank + val r = k + pfxed + + // We represent Omega by its seed. + val omegaSeed = RandomUtils.getRandom().nextInt() + + // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and + // instantiate the Omega random matrix view in the backend instead. That way serialized closure + // is much more compact. + var drmY = drmAcp.mapBlock(ncol = r) { + case (keys, blockA) => + val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) + keys -> blockY + } + + var drmQ = dqrThin(drmY.checkpoint())._1 + // Checkpoint Q if last iteration + if (q == 0) drmQ = drmQ.checkpoint() + + // This actually should be optimized as identically partitioned map-side A'B since A and Q should + // still be identically partitioned. + var drmBt = drmAcp.t %*% drmQ + // Checkpoint B' if last iteration + if (q == 0) drmBt = drmBt.checkpoint() + + for (i <- 0 until q) { + drmY = drmAcp %*% drmBt + drmQ = dqrThin(drmY.checkpoint())._1 + // Checkpoint Q if last iteration + if (i == q - 1) drmQ = drmQ.checkpoint() + + // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not + // identically partitioned anymore. + drmBt = drmAcp.t %*% drmQ + // Checkpoint B' if last iteration + if (i == q - 1) drmBt = drmBt.checkpoint() + } + + val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt) + val s = d.sqrt + + // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags + // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? + val drmU = drmQ %*% inCoreUHat + val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) + + (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala new file mode 100644 index 0000000..80385a3 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decompositions + +import scala.math._ +import org.apache.mahout.math.{Matrices, Matrix} +import org.apache.mahout.common.RandomUtils +import org.apache.log4j.Logger +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ + +private[math] object SSVD { + + private val log = Logger.getLogger(SSVD.getClass) + + /** + * In-core SSVD algorithm. + * + * @param a input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s) + */ + def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = { + val m = a.nrow + val n = a.ncol + if (k > min(m, n)) + throw new IllegalArgumentException( + "k cannot be greater than smaller of m,n") + val pfxed = min(p, min(m, n) - k) + + // Actual decomposition rank + val r = k + pfxed + + val rnd = RandomUtils.getRandom + val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) + + var y = a %*% omega + var yty = y.t %*% y + val at = a.t + var ch = chol(yty) + assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") + var bt = ch.solveRight(at %*% y) + + // Power iterations + for (i <- 0 until q) { + y = a %*% bt + yty = y.t %*% y + ch = chol(yty) + bt = ch.solveRight(at %*% y) + } + + val bbt = bt.t %*% bt + val (uhat, d) = eigen(bbt) + + val s = d.sqrt + val u = ch.solveRight(y) %*% uhat + val v = bt %*% (uhat %*%: diagv(1 /: s)) + + (u(::, 0 until k), v(::, 0 until k), s(0 until k)) + } + + /** + * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This + * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed + * to save some memory for sparse inputs by removing direct mean subtraction.<P> + * + * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. + * If retaining distances and orignal scaled variances not that important, the normalized PCA space + * is just U. + * + * Important: data points are considered to be rows. + * + * @param a input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s) + */ + def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = { + val m = a.nrow + val n = a.ncol + if (k > min(m, n)) + throw new IllegalArgumentException( + "k cannot be greater than smaller of m,n") + val pfxed = min(p, min(m, n) - k) + + // Actual decomposition rank + val r = k + pfxed + + val rnd = RandomUtils.getRandom + val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) + + // Dataset mean + val xi = a.colMeans() + + if (log.isDebugEnabled) log.debug("xi=%s".format(xi)) + + var y = a %*% omega + + // Fixing y + val s_o = omega.t %*% xi + y := ((r,c,v) => v - s_o(c)) + + var yty = y.t %*% y + var ch = chol(yty) +// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") + + // This is implicit Q of QR(Y) + var qm = ch.solveRight(y) + var bt = a.t %*% qm + var s_q = qm.colSums() + var s_b = bt.t %*% xi + + // Power iterations + for (i <- 0 until q) { + + // Fix bt + bt -= xi cross s_q + + y = a %*% bt + + // Fix Y again. + y := ((r,c,v) => v - s_b(c)) + + yty = y.t %*% y + ch = chol(yty) + qm = ch.solveRight(y) + bt = a.t %*% qm + s_q = qm.colSums() + s_b = bt.t %*% xi + } + + val c = s_q cross s_b + + // BB' computation becomes + val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi) + + val (uhat, d) = eigen(bbt) + + val s = d.sqrt + val u = qm %*% uhat + val v = bt %*% (uhat %*%: diagv(1 /: s)) + + (u(::, 0 until k), v(::, 0 until k), s(0 until k)) + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala new file mode 100644 index 0000000..a7b829f --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/decompositions/package.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math + +import scala.reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike + +/** + * This package holds all decomposition and factorization-like methods, all that we were able to make + * distributed engine-independent so far, anyway. + */ +package object decompositions { + + // ================ In-core decompositions =================== + + /** + * In-core SSVD algorithm. + * + * @param a input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s) + */ + def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q) + + /** + * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This + * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed + * to save some memory for sparse inputs by removing direct mean subtraction.<P> + * + * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. + * If retaining distances and orignal scaled variances not that important, the normalized PCA space + * is just U. + * + * Important: data points are considered to be rows. + * + * @param a input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s) + */ + def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = + SSVD.spca(a = a, k = k, p = p, q = q) + + // ============== Distributed decompositions =================== + + /** + * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty + * controlled (<5000 or so). <P> + * + * It is recommended to checkpoint A since it does two passes over it. <P> + * + * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so + * their RDD should be able to zip successfully. + */ + def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = + DQR.dqrThin(drmA, checkRankDeficiency) + + /** + * Distributed Stochastic Singular Value decomposition algorithm. + * + * @param drmA input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them + * e.g. save them to hdfs in order to trigger their computation. + */ + def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q) + + /** + * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" + * document of the MAHOUT-817. + * + * @param drmA input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations (hint: use either 0 or 1) + * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them + * e.g. save them to hdfs in order to trigger their computation. + */ + def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q) + + /** Result for distributed ALS-type two-component factorization algorithms */ + type FactorizationResult[K] = ALS.Result[K] + + /** Result for distributed ALS-type two-component factorization algorithms, in-core matrices */ + type FactorizationResultInCore = ALS.InCoreResult + + /** + * Run ALS. + * <P> + * + * Example: + * + * <pre> + * val (u,v,errors) = als(input, k).toTuple + * </pre> + * + * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, + * whichever earlier. + * <P> + * + * @param drmA The input matrix + * @param k required rank of decomposition (number of cols in U and V results) + * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this + * value. If <=0 then we won't compute RMSE and use convergence test. + * @param lambda regularization rate + * @param maxIterations maximum iterations to run regardless of convergence + * @tparam K row key type of the input (100 is probably more than enough) + * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} + */ + def dals[K: ClassTag]( + drmA: DrmLike[K], + k: Int = 50, + lambda: Double = 0.0, + maxIterations: Int = 10, + convergenceThreshold: Double = 0.10 + ): FactorizationResult[K] = + ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold) + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala new file mode 100644 index 0000000..850614457 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/BCast.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +/** Broadcast variable abstraction */ +trait BCast[T] { + def value:T +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala new file mode 100644 index 0000000..ac763f9 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala @@ -0,0 +1,19 @@ +package org.apache.mahout.math.drm + +object CacheHint extends Enumeration { + + type CacheHint = Value + + val NONE, + DISK_ONLY, + DISK_ONLY_2, + MEMORY_ONLY, + MEMORY_ONLY_2, + MEMORY_ONLY_SER, + MEMORY_ONLY_SER_2, + MEMORY_AND_DISK, + MEMORY_AND_DISK_2, + MEMORY_AND_DISK_SER, + MEMORY_AND_DISK_SER_2 = Value + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala new file mode 100644 index 0000000..7f97481 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import org.apache.mahout.math.Matrix +import scala.reflect.ClassTag + +/** + * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be + * therefore collected or saved. + * @tparam K matrix key type (e.g. the keys of sequence files once persisted) + */ +trait CheckpointedDrm[K] extends DrmLike[K] { + + def collect: Matrix + + def dfsWrite(path: String) + + /** If this checkpoint is already declared cached, uncache. */ + def uncache(): this.type + + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + def keyClassTag: ClassTag[K] + + + /** changes the number of rows without touching the underlying data */ + def newRowCardinality(n: Int): CheckpointedDrm[K] + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala new file mode 100644 index 0000000..8c3911f --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import scala.reflect.ClassTag +import org.apache.mahout.math._ + + +/** + * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to + * the DRMBase once they stabilize. + * + */ +class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) { + + + /** Column sums. At this point this runs on checkpoint and collects in-core vector. */ + def colSums(): Vector = drm.context.colSums(drm) + + /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */ + def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm) + + /** Column Means */ + def colMeans(): Vector = drm.context.colMeans(drm) + + def norm():Double = drm.context.norm(drm) +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala new file mode 100644 index 0000000..39bab90 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import java.io.Closeable + +/** Distributed context (a.k.a. distributed session handle) */ +trait DistributedContext extends Closeable { + + val engine:DistributedEngine + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala new file mode 100644 index 0000000..dd5b101 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import com.google.common.collect.{HashBiMap, BiMap} +import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset} + +import scala.reflect.ClassTag +import logical._ +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import RLikeDrmOps._ +import DistributedEngine._ +import org.apache.mahout.math.scalabindings._ +import org.apache.log4j.Logger + +/** Abstraction of optimizer/distributed engine */ +trait DistributedEngine { + + /** + * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may + * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce + * per se. + * <P> + * + * A particular physical engine implementation may choose to either use the default rewrites or + * build its own rewriting rules. + * <P> + */ + def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action))) + + /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */ + def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] + + /** Engine-specific colSums implementation based on a checkpoint. */ + def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector + + /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ + def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector + + /** Engine-specific colMeans implementation based on a checkpoint. */ + def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector + + def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double + + /** Broadcast support */ + def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] + + /** Broadcast support */ + def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] + + /** + * Load DRM from hdfs (as in Mahout DRM format). + * <P/> + * @param path The DFS path to load from + * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). + */ + def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] + + /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ + def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) + (implicit sc: DistributedContext): CheckpointedDrm[Int] + + /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ + def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) + (implicit sc: DistributedContext): CheckpointedDrm[String] + + /** This creates an empty DRM with specified number of partitions and cardinality. */ + def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) + (implicit sc: DistributedContext): CheckpointedDrm[Int] + + /** Creates empty DRM with non-trivial height */ + def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) + (implicit sc: DistributedContext): CheckpointedDrm[Long] + /** + * Load IndexedDataset from text delimited format. + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDataset + + /** + * Load IndexedDataset from text delimited format, one element per line + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDataset + +} + +object DistributedEngine { + + private val log = Logger.getLogger(DistributedEngine.getClass) + + /** This is mostly multiplication operations rewrites */ + private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + + action match { + case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a)) + case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a)) + + // For now, rewrite left-multiply via transpositions, i.e. + // inCoreA %*% B = (B' %*% inCoreA')' + case op@OpTimesLeftMatrix(a, b) => + OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) + + // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments + case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) => + + // Make sure closure sees only local vals, not attributes. We need to do these ugly casts + // around because compiler could not infer that K is the same as Int, based on if() above. + val ma = safeToNonNegInt(a.nrow) + val bAdjusted = new OpMapBlock[Int, Int]( + A = pass1(b.asInstanceOf[DrmLike[Int]]), + bmf = { + case (keys, block) => keys.map(_ + ma) -> block + }, + identicallyPartitioned = false + ) + val aAdjusted = a.asInstanceOf[DrmLike[Int]] + OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]] + + // Stop at checkpoints + case cd: CheckpointedDrm[_] => action + + // For everything else we just pass-thru the operator arguments to optimizer + case uop: AbstractUnaryOp[_, K] => + uop.A = pass1(uop.A)(uop.classTagA) + uop + case bop: AbstractBinaryOp[_, _, K] => + bop.A = pass1(bop.A)(bop.classTagA) + bop.B = pass1(bop.B)(bop.classTagB) + bop + } + } + + /** This would remove stuff like A.t.t that previous step may have created */ + private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + action match { + // A.t.t => A + case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA) + + // Stop at checkpoints + case cd: CheckpointedDrm[_] => action + + // For everything else we just pass-thru the operator arguments to optimizer + case uop: AbstractUnaryOp[_, K] => + uop.A = pass2(uop.A)(uop.classTagA) + uop + case bop: AbstractBinaryOp[_, _, K] => + bop.A = pass2(bop.A)(bop.classTagA) + bop.B = pass2(bop.B)(bop.classTagB) + bop + } + } + + /** Some further rewrites that are conditioned on A.t.t removal */ + private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + action match { + + // matrix products. + case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b)) + + // AtB cases that make sense. + case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b)) + case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b)) + + // Need some cost to choose between the following. + + case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b)) + // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a))) + case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b))) + + // Rewrite A'x + case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x) + + // Stop at checkpoints + case cd: CheckpointedDrm[_] => action + + // For everything else we just pass-thru the operator arguments to optimizer + case uop: AbstractUnaryOp[_, K] => + uop.A = pass3(uop.A)(uop.classTagA) + uop + case bop: AbstractBinaryOp[_, _, K] => + bop.A = pass3(bop.A)(bop.classTagA) + bop.B = pass3(bop.B)(bop.classTagB) + bop + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala new file mode 100644 index 0000000..e5cf563 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import RLikeDrmOps._ +import scala.reflect.ClassTag + +class DrmDoubleScalarOps(val x:Double) extends AnyVal{ + + def +[K:ClassTag](that:DrmLike[K]) = that + x + + def *[K:ClassTag](that:DrmLike[K]) = that * x + + def -[K:ClassTag](that:DrmLike[K]) = x -: that + + def /[K:ClassTag](that:DrmLike[K]) = x /: that + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala new file mode 100644 index 0000000..b9c50b0 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import scala.reflect.ClassTag + + +/** + * + * Basic spark DRM trait. + * + * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in + * this package. Spark backing is already implied. + * + */ +trait DrmLike[K] { + + protected[mahout] def partitioningTag: Long + + protected[mahout] def canHaveMissingRows: Boolean + + /** + * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm. + * DistributedEngine]]. + */ + val context:DistributedContext + + /** R-like syntax for number of rows. */ + def nrow: Long + + /** R-like syntax for number of columns */ + def ncol: Int + + /** + * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer + * and writing down Spark graph lineage since last checkpointed DRM. + */ + def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K] + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala new file mode 100644 index 0000000..bc937d6 --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import scala.reflect.ClassTag +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange} + +/** Common Drm ops */ +class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { + + /** + * Parallelism adjustments. <P/> + * + * Change only one of parameters from default value to choose new parallelism adjustment strategy. + * <P/> + * + * E.g. use + * <pre> + * drmA.par(auto = true) + * </pre> + * to use automatic parallelism adjustment. + * <P/> + * + * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for + * a particular backend strategy. However, it is usually equivalent to number of map tasks or data + * splits. + * <P/> + * + * @param min If changed from default, ensures the product has at least that much parallelism. + * @param exact if changed from default, ensures the pipeline product has exactly that much + * parallelism. + * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy + * is applied. + */ + def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = { + assert(min >= 0 || exact >= 0 || auto, "Invalid argument") + OpPar(drm, minSplits = min, exactSplits = exact) + } + + /** + * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block + * matrices; or they could be completely new matrices with new keyset. In the latter case, output + * matrix width must be specified with <code>ncol</code> parameter.<P> + * + * New block heights must be of the same height as the original geometry.<P> + * + * @param ncol new matrix' width (only needed if width changes). + * @param bmf + * @tparam R + * @return + */ + def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true) + (bmf: BlockMapFunc[K, R]): DrmLike[R] = + new OpMapBlock[K, R]( + A = drm, + bmf = bmf, + _ncol = ncol, + identicallyPartitioned = identicallyParitioned + ) + + + /** + * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P> + * + * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P> + * + * Row range is currently unsupported except for the all-range. When it will be fully supported, + * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications. + * + * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key. + * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range. + */ + def apply(rowRange: Range, colRange: Range): DrmLike[K] = { + + import RLikeDrmOps._ + import RLikeOps._ + + val rowSrc: DrmLike[K] = if (rowRange != ::) { + + if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) { + + assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range") + val intKeyed = drm.asInstanceOf[DrmLike[Int]] + + new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]] + + } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.") + + } else drm + + if (colRange != ::) { + + assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range") + + // Use mapBlock operator to do in-core subranging. + rowSrc.mapBlock(ncol = colRange.length)({ + case (keys, block) => keys -> block(::, colRange) + }) + + } else rowSrc + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/f7b69fab/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala new file mode 100644 index 0000000..380f4eb --- /dev/null +++ b/samsara/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm + +import scala.reflect.ClassTag +import org.apache.mahout.math.{Vector, Matrix} +import org.apache.mahout.math.drm.logical._ + +class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { + + import RLikeDrmOps._ + + def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+") + + def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-") + + def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*") + + def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/") + + def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") + + def +:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") + + def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-") + + def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:") + + def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") + + def *:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") + + def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/") + + def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:") + + def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that) + + def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that) + + def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that + + def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that) + + def %*%(that: Matrix): DrmLike[K] = this :%*% that + + def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that) + + def %*%(that: Vector): DrmLike[K] = :%*%(that) + + def t: DrmLike[Int] = OpAtAnyKey(A = drm) + + def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that) + + def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that) +} + +class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { + + import org.apache.mahout.math._ + import scalabindings._ + import RLikeOps._ + import RLikeDrmOps._ + import scala.collection.JavaConversions._ + + override def t: DrmLike[Int] = OpAt(A = drm) + + def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm) + + def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm) + + /** Row sums. This is of course applicable to Int-keyed distributed matrices only. */ + def rowSums(): Vector = { + drm.mapBlock(ncol = 1) { case (keys, block) => + // Collect block-wise rowsums and output them as one-column matrix. + keys -> dense(block.rowSums).t + } + .collect(::, 0) + } + + /** Counts the non-zeros elements in each row returning a vector of the counts */ + def numNonZeroElementsPerRow(): Vector = { + drm.mapBlock(ncol = 1) { case (keys, block) => + // Collect block-wise row non-zero counts and output them as a one-column matrix. + keys -> dense(block.numNonZeroElementsPerRow).t + } + .collect(::, 0) + } + + /** Row means */ + def rowMeans(): Vector = { + drm.mapBlock(ncol = 1) { case (keys, block) => + // Collect block-wise row means and output them as one-column matrix. + keys -> dense(block.rowMeans).t + } + .collect(::, 0) + } + + /** Return diagonal vector */ + def diagv: Vector = { + require(drm.ncol == drm.nrow, "Must be square to extract diagonal") + drm.mapBlock(ncol = 1) { case (keys, block) => + keys -> dense(for (r <- block.view) yield r(keys(r.index))).t + } + .collect(::, 0) + } + +} + +object RLikeDrmOps { + + implicit def double2ScalarOps(x:Double) = new DrmDoubleScalarOps(x) + + implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm) + + implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm) + + implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm + + implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm + + // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files + // Not sure why. + // implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp) + + /** + * This is probably dangerous since it triggers implicit checkpointing with default storage level + * setting. + */ + implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint()) +}
