NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/666d314f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/666d314f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/666d314f Branch: refs/heads/master Commit: 666d314fb8d9f466a5496a143d73d55036aea619 Parents: 4e6577d Author: pferrel <[email protected]> Authored: Tue Oct 14 14:45:32 2014 -0700 Committer: pferrel <[email protected]> Committed: Tue Oct 14 14:45:32 2014 -0700 ---------------------------------------------------------------------- .../apache/mahout/h2obindings/H2OEngine.scala | 44 +++ .../h2obindings/drm/CheckpointedDrmH2O.scala | 13 + math-scala/pom.xml | 6 + .../apache/mahout/drivers/MahoutDriver.scala | 43 +++ .../mahout/drivers/MahoutOptionParser.scala | 220 +++++++++++++++ .../mahout/math/cf/SimilarityAnalysis.scala | 56 +++- .../mahout/math/drm/CheckpointedDrm.scala | 3 + .../mahout/math/drm/DistributedEngine.scala | 25 ++ .../org/apache/mahout/math/drm/package.scala | 26 +- .../math/indexeddataset/IndexedDataset.scala | 64 +++++ .../math/indexeddataset/ReaderWriter.scala | 68 +++++ .../mahout/math/indexeddataset/Schema.scala | 102 +++++++ .../apache/mahout/common/HDFSPathSearch.scala | 78 ++++++ .../apache/mahout/drivers/FileSysUtils.scala | 76 ------ .../apache/mahout/drivers/IndexedDataset.scala | 80 ------ .../mahout/drivers/ItemSimilarityDriver.scala | 90 +++---- .../apache/mahout/drivers/MahoutDriver.scala | 104 ------- .../mahout/drivers/MahoutOptionParser.scala | 214 --------------- .../mahout/drivers/MahoutSparkDriver.scala | 87 ++++++ .../apache/mahout/drivers/ReaderWriter.scala | 66 ----- .../mahout/drivers/RowSimilarityDriver.scala | 41 +-- .../org/apache/mahout/drivers/Schema.scala | 98 ------- .../drivers/TextDelimitedReaderWriter.scala | 68 ++--- .../mahout/sparkbindings/SparkEngine.scala | 38 ++- .../drm/CheckpointedDrmSpark.scala | 12 + .../indexeddataset/IndexedDatasetSpark.scala | 53 ++++ .../apache/mahout/sparkbindings/package.scala | 4 +- .../mahout/cf/CooccurrenceAnalysisSuite.scala | 267 ------------------ .../mahout/cf/SimilarityAnalysisSuite.scala | 269 +++++++++++++++++++ 29 files changed, 1273 insertions(+), 1042 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 28214c6..99bc3ba 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -17,6 +17,9 @@ package org.apache.mahout.h2obindings +import com.google.common.collect.{HashBiMap, BiMap} +import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema} + import scala.reflect._ import org.apache.mahout.math._ import org.apache.mahout.math.drm._ @@ -112,4 +115,45 @@ object H2OEngine extends DistributedEngine { } implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]] + + /** stub class not implemented in H2O */ + abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int]) + extends IndexedDataset {} + + /** + * reads an IndexedDatasetH2O from default text delimited files + * @todo unimplemented + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + * @return + */ + def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDatasetH2O = { + // should log a warning when this is built but no logger here, can an H2O contributor help with this + println("Warning: unimplemented indexedDatasetDFSReadElements." ) + throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.") + null.asInstanceOf[IndexedDatasetH2O] + } + + /** + * reads an IndexedDatasetH2O from default text delimited files + * @todo unimplemented + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + * @return + */ + def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDatasetH2O = { + // should log a warning when this is built but no logger here, can an H2O contributor help with this + println("Warning: unimplemented indexedDatasetDFSReadElements." ) + throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.") + null.asInstanceOf[IndexedDatasetH2O] + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index c06455a..6d7628e 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -53,4 +53,17 @@ class CheckpointedDrmH2O[K: ClassTag]( def canHaveMissingRows: Boolean = false protected[mahout] def partitioningTag: Long = h2odrm.frame.anyVec.group.hashCode + + /** stub need to make IndexedDataset core but since drmWrap is not in H2O left for someone else */ + override def newRowCardinality(n: Int): CheckpointedDrm[K] = { + throw new UnsupportedOperationException("CheckpointedDrmH2O#newRowCardinality is not implemented.") + /* this is the Spark impl + assert(n > -1) + assert( n >= nrow) + val newCheckpointedDrm = drmWrap[K](rdd, n, ncol) + newCheckpointedDrm + */ + this + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/pom.xml ---------------------------------------------------------------------- diff --git a/math-scala/pom.xml b/math-scala/pom.xml index b2fea6b..66309d6 100644 --- a/math-scala/pom.xml +++ b/math-scala/pom.xml @@ -172,6 +172,12 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> + + <dependency> + <groupId>com.github.scopt</groupId> + <artifactId>scopt_2.10</artifactId> + <version>3.2.0</version> + </dependency> <!-- scala stuff --> <dependency> http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala new file mode 100644 index 0000000..8c1f8cf --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.mahout.math.drm.DistributedContext + +/** Extended by a platform specific version of this class to create a Mahout CLI driver. + */ +abstract class MahoutDriver { + + + implicit protected var mc: DistributedContext = _ + protected var parser: MahoutOptionParser = _ + + var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite + + /** Override (optionally) for special cleanup */ + protected def stop: Unit = { + if (!_useExistingContext) mc.close + } + + /** This is where you do the work, call start first, then before exiting call stop */ + protected def process: Unit + + /** Parse command line and call process */ + def main(args: Array[String]): Unit + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala new file mode 100644 index 0000000..c6968b8 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.drivers + +import scopt.OptionParser + +import scala.collection.immutable + +/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to + * keep both standarized. + * @param programName Name displayed in help message, the name by which the driver is invoked. + * @note not all options are platform neutral so other platforms can add option parsing here if desired. + * */ +class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { + + // build options from some stardard CLI param groups + // Note: always put the driver specific options at the last so the can override and previous options! + var opts = Map.empty[String, Any] + + override def showUsageOnError = true + + def parseIOOptions(numInputs: Int = 1) = { + opts = opts ++ MahoutOptionParser.FileIOOptions + note("Input, output options") + opt[String]('i', "input") required() action { (x, options) => + options + ("input" -> x) + } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") + + if (numInputs == 2) { + opt[String]("input2") abbr ("i2") action { (x, options) => + options + ("input2" -> x) + } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") + } + + opt[String]('o', "output") required() action { (x, options) => + // todo: check to see if HDFS allows MS-Windows backslashes locally? + if (x.endsWith("/")) { + options + ("output" -> x) + } else { + options + ("output" -> (x + "/")) + } + } text ("Path for output, any local or HDFS supported URI (required)") + + } + + def parseSparkOptions = { + opts = opts ++ MahoutOptionParser.SparkOptions + note("\nSpark config options:") + + opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => + options + ("master" -> x) + } + + opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) => + options + ("sparkExecutorMem" -> x) + } + + } + + def parseGenericOptions = { + opts = opts ++ MahoutOptionParser.GenericOptions + opt[Int]("randomSeed") abbr ("rs") action { (x, options) => + options + ("randomSeed" -> x) + } validate { x => + if (x > 0) success else failure("Option --randomSeed must be > 0") + } + + //output both input DRMs + opt[Unit]("writeAllDatasets") hidden() action { (_, options) => + options + ("writeAllDatasets" -> true) + }//Hidden option, though a user might want this. + } + + def parseElementInputSchemaOptions{ + //Input text file schema--not driver specific but input data specific, elements input, + // not drms + opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions + note("\nInput text file schema options:") + opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) => + options + ("inDelim" -> x) + } + + opt[String]("filter1") abbr ("f1") action { (x, options) => + options + ("filter1" -> x) + } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used") + + opt[String]("filter2") abbr ("f2") action { (x, options) => + options + ("filter2" -> x) + } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected") + + opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) => + options + ("rowIDColumn" -> x) + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } + + opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) => + options + ("itemIDColumn" -> x) + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } + + opt[Int]("filterColumn") abbr ("fc") action { (x, options) => + options + ("filterColumn" -> x) + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => + if (x >= -1) success else failure("Option --filterColNum must be >= -1") + } + + note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") + + //check for column consistency + checkConfig { options: Map[String, Any] => + if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int] + || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int] + || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]) + failure("The row, item, and filter positions must be unique.") else success + } + + //check for filter consistency + checkConfig { options: Map[String, Any] => + if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter2").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String]) + failure ("If using filters they must be unique.") else success + } + + } + + def parseFileDiscoveryOptions = { + //File finding strategy--not driver specific + opts = opts ++ MahoutOptionParser.FileDiscoveryOptions + note("\nFile discovery options:") + opt[Unit]('r', "recursive") action { (_, options) => + options + ("recursive" -> true) + } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") + + opt[String]("filenamePattern") abbr ("fp") action { (x, options) => + options + ("filenamePattern" -> x) + } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory") + + } + + def parseDrmFormatOptions = { + opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions + note("\nOutput text file schema options:") + opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => + options + ("rowKeyDelim" -> x) + } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") + + opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => + options + ("columnIdStrengthDelim" -> x) + } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") + + opt[String]("elementDelim") abbr ("td") action { (x, options) => + options + ("elementDelim" -> x) + } text ("Separates vector element values in the values list (optional). Default: \" \"") + + opt[Unit]("omitStrength") abbr ("os") action { (_, options) => + options + ("omitStrength" -> true) + } text ("Do not write the strength to the output files (optional), Default: false.") + note("This option is used to output indexable data for creating a search engine recommender.") + + note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") + } + +} + +/** Companion object defines default option groups for reference in any driver that needs them. + * @note not all options are platform neutral so other platforms can add default options here if desired */ +object MahoutOptionParser { + + // set up the various default option groups + final val GenericOptions = immutable.HashMap[String, Any]( + "randomSeed" -> System.currentTimeMillis().toInt, + "writeAllDatasets" -> false) + + final val SparkOptions = immutable.HashMap[String, Any]( + "master" -> "local", + "sparkExecutorMem" -> "", + "appName" -> "Generic Spark App, Change this.") + + final val FileIOOptions = immutable.HashMap[String, Any]( + "input" -> null.asInstanceOf[String], + "input2" -> null.asInstanceOf[String], + "output" -> null.asInstanceOf[String]) + + final val FileDiscoveryOptions = immutable.HashMap[String, Any]( + "recursive" -> false, + "filenamePattern" -> "^part-.*") + + final val TextDelimitedElementsOptions = immutable.HashMap[String, Any]( + "rowIDColumn" -> 0, + "itemIDColumn" -> 1, + "filterColumn" -> -1, + "filter1" -> null.asInstanceOf[String], + "filter2" -> null.asInstanceOf[String], + "inDelim" -> "[,\t ]") + + final val TextDelimitedDRMOptions = immutable.HashMap[String, Any]( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitStrength" -> false) +} + + http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala index 90d7559..5718304 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -18,6 +18,7 @@ package org.apache.mahout.math.cf import org.apache.mahout.math._ +import org.apache.mahout.math.indexeddataset.IndexedDataset import scalabindings._ import RLikeOps._ import drm._ @@ -49,7 +50,7 @@ object SimilarityAnalysis extends Serializable { * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 * @param maxNumInteractions max number of interactions after downsampling, default: 500 * @return - * */ + */ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { @@ -99,13 +100,39 @@ object SimilarityAnalysis extends Serializable { indicatorMatrices } + /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... + * and returns a list of indicator and cross-indicator matrices + * Somewhat easier to use method, which handles the ID dictionaries correctly + * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingItemsPerThing max similarities per items + * @param maxNumInteractions max number of input items per item + * @return + */ + def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset], + randomSeed: Int = 0xdeadbeef, + maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500): + List[IndexedDataset] = { + val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]]) + val primaryDrm = drms(0) + val secondaryDrms = drms.drop(1) + val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing, + maxNumInteractions, secondaryDrms) + val retIDSs = coocMatrices.iterator.zipWithIndex.map { + case( drm, i ) => + indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs) + } + retIDSs.toList + } + /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows * @param drmARaw Primary interaction matrix * @param randomSeed when kept to a constant will make repeatable downsampling * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 * @param maxNumInteractions max number of interactions after downsampling, default: 500 * @return - * */ + */ def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, maxNumInteractions: Int = 500): DrmLike[Int] = { @@ -130,10 +157,27 @@ object SimilarityAnalysis extends Serializable { drmSimilaritiesAAt } - /** - * Compute loglikelihood ratio - * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details - **/ + /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. + * Uses IndexedDatasets, which handle external ID dictionaries properly + * @param indexedDataset compare each row to every other + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingSimilaritiesPerRow max elements returned in each row + * @param maxObservationsPerRow max number of input elements to use + * @return + */ + def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef, + maxInterestingSimilaritiesPerRow: Int = 50, + maxObservationsPerRow: Int = 500): + IndexedDataset = { + val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow, + maxObservationsPerRow) + indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs) + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + */ def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, numInteractionsWithAandB: Long, numInteractions: Long) = { http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 082e5b9..7f97481 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -41,4 +41,7 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def keyClassTag: ClassTag[K] + /** changes the number of rows without touching the underlying data */ + def newRowCardinality(n: Int): CheckpointedDrm[K] + } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index eaf5aeb..dd5b101 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -17,6 +17,9 @@ package org.apache.mahout.math.drm +import com.google.common.collect.{HashBiMap, BiMap} +import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset} + import scala.reflect.ClassTag import logical._ import org.apache.mahout.math._ @@ -85,6 +88,28 @@ trait DistributedEngine { /** Creates empty DRM with non-trivial height */ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) (implicit sc: DistributedContext): CheckpointedDrm[Long] + /** + * Load IndexedDataset from text delimited format. + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDataset + + /** + * Load IndexedDataset from text delimited format, one element per line + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDataset + } object DistributedEngine { http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index b787ec0..3afbecb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -17,10 +17,13 @@ package org.apache.mahout.math -import scala.reflect.ClassTag +import com.google.common.collect.{HashBiMap, BiMap} +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema} +import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR} + +import scala.reflect.ClassTag package object drm { @@ -114,3 +117,20 @@ package object drm { } } + +package object indexeddataset { + /** Load IndexedDataset from text delimited files */ + def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit ctx: DistributedContext): + IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs) + + def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit ctx: DistributedContext): + IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs) + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala new file mode 100644 index 0000000..c7eb2cb --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.indexeddataset + +import com.google.common.collect.BiMap +import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} +import org.apache.mahout.math.indexeddataset + +/** + * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store + * ID/label translation dictionaries. + * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so + * a user specified label or ID can be stored and mapped to and from the Mahout Int ID + * used internal to Mahout core code. + * + * @todo Often no need for both or perhaps either dictionary, so save resources by allowing + * to be not created when not needed. + */ + +trait IndexedDataset { + val matrix: CheckpointedDrm[Int] + val rowIDs: BiMap[String,Int] + val columnIDs: BiMap[String,Int] + + /** + * Write a text delimited file(s) with the row and column IDs from dictionaries. + * @param dest + * @param schema + */ + def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit + + /** Factory method, creates the extending class */ + def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): + IndexedDataset + + /** + * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value. + * No changes are made to the underlying drm. + * @param n number to use for new row cardinality, should be larger than current + * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable + * results. + */ + def newRowCardinality(n: Int): IndexedDataset = { + // n is validated in matrix + this.create(matrix.newRowCardinality(n), rowIDs, columnIDs) + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala new file mode 100644 index 0000000..cf429f5 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.indexeddataset + +import com.google.common.collect.{BiMap, HashBiMap} +import org.apache.mahout.math.drm.DistributedContext + +/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, + * which also defines the type to be read. + * @tparam T type of object to read. + */ +trait Reader[T]{ + + val mc: DistributedContext + val readSchema: Schema + + protected def elementReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int]): T + + protected def drmReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int]): T + + def readElementsFrom( + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + elementReader(mc, readSchema, source, existingRowIDs) + + def readDRMFrom( + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + drmReader(mc, readSchema, source, existingRowIDs) +} + +/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, + * which also defines the type to be written. + * @tparam T type of object to write. + */ +trait Writer[T]{ + + val mc: DistributedContext + val sort: Boolean + val writeSchema: Schema + + protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit + + def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) +} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala new file mode 100644 index 0000000..557b419 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.indexeddataset + +import scala.collection.mutable.HashMap + +/** Syntactic sugar for mutable.HashMap[String, Any] + * + * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} + */ +class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { + // note: this require a mutable HashMap, do we care? + this ++= params + + /** Constructor for copying an existing Schema + * + * @param schemaToClone return a copy of this Schema + */ + def this(schemaToClone: Schema){ + this() + this ++= schemaToClone + } +} + +/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific + * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] + * , which can be used to create a Mahout DRM for DSL ops. + */ + + +/** Simple default Schema for typical text delimited element file input + * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID + * <comma, tab, or space>here may be other ignored text...) + */ +final object DefaultIndexedDatasetElementReadSchema extends Schema( + "delim" -> "[,\t ]", //comma, tab or space + "filter" -> "", + "rowIDColumn" -> 0, + "columnIDPosition" -> 1, + "filterColumn" -> -1) + +/** Default Schema for text delimited drm file output + * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form: + * (rowID<tab>columnID1:score1<space>columnID2:score2...) + */ +final object DefaultIndexedDatasetWriteSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitScore" -> false) + +/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input + * This tells the reader to input text lines of the form: + * (rowID<tab>columnID1:score1,columnID2:score2,...) + */ +final object DefaultIndexedDatasetReadSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ") + +/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where + * the score of any element is ignored, + * all non-zeros are replaced with 1. + * This tells the reader to input DRM lines of the form + * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. + * Alternatively the format can be + * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default + * output format for [[IndexedDatasetWriteBooleanSchema]] + */ +final object IndexedDatasetReadBooleanSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitScore" -> true) + +/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where + * the score of a element is omitted. + * The presence of a element means the score = 1, the absence means a score of 0. + * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form + * (rowID<tab>columnID1<space>columnID2...) + */ +final object IndexedDatasetWriteBooleanSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitScore" -> true) + http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala new file mode 100644 index 0000000..42bf697 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +/** + * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters + * in the constructor. + * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] + * + * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs + * @param filePattern regex that must match the entire filename to have the file returned + * @param recursive true traverses the filesystem recursively, default = false + */ + +case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) { + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + /** Returns a string of comma delimited URIs matching the filePattern + * When pattern matching dirs are never returned, only traversed. */ + def uris: String = { + if (!filePattern.isEmpty){ // have file pattern so + val pathURIs = pathURI.split(",") + var files = "" + for ( uri <- pathURIs ){ + files = findFiles(uri, filePattern, files) + } + if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma + files + }else{ + pathURI + } + } + + /** Find matching files in the dir, recursively call self when another directory is found + * Only files are matched, directories are traversed but never return a match */ + private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = { + val seed = fs.getFileStatus(new Path(dir)) + var f: String = files + + if (seed.isDir) { + val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) + for (fileStatus <- fileStatuses) { + if (fileStatus.getPath().getName().matches(filePattern) + && !fileStatus.isDir) { + // found a file + if (fileStatus.getLen() != 0) { + // file is not empty + f = f + fileStatus.getPath.toUri.toString + "," + } + } else if (fileStatus.isDir && recursive) { + f = findFiles(fileStatus.getPath.toString, filePattern, f) + } + } + }else{ f = dir }// was a filename not dir + f + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala deleted file mode 100644 index f48e9ed..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.drivers - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} - -/** - * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor. - * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] - * - * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs - * @param filePattern regex that must match the entire filename to have the file returned - * @param recursive true traverses the filesystem recursively, default = false - */ - -case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) { - - val conf = new Configuration() - val fs = FileSystem.get(conf) - -/** Returns a string of comma delimited URIs matching the filePattern - * When pattern matching dirs are never returned, only traversed. */ - def uris :String = { - if (!filePattern.isEmpty){ // have file pattern so - val pathURIs = pathURI.split(",") - var files = "" - for ( uri <- pathURIs ){ - files = findFiles(uri, filePattern, files) - } - if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma - files - }else{ - pathURI - } - } - -/** Find matching files in the dir, recursively call self when another directory is found - * Only files are matched, directories are traversed but never return a match */ - private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = { - val seed = fs.getFileStatus(new Path(dir)) - var f :String = files - - if (seed.isDir) { - val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) - for (fileStatus <- fileStatuses) { - if (fileStatus.getPath().getName().matches(filePattern) - && !fileStatus.isDir) { - // found a file - if (fileStatus.getLen() != 0) { - // file is not empty - f = f + fileStatus.getPath.toUri.toString + "," - } - } else if (fileStatus.isDir && recursive) { - f = findFiles(fileStatus.getPath.toString, filePattern, f) - } - } - }else{ f = dir }// was a filename not dir - f - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala deleted file mode 100644 index 99f98f5..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.drivers - -import com.google.common.collect.BiMap -import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm} -import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark -import org.apache.mahout.sparkbindings._ - -/** - * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries. - * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so - * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID - * used internal to Mahout Core code. - * - * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code - * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new - * [[org.apache.mahout.drivers.IndexedDataset]] is returned. - * - * @param matrix DrmLike[Int], representing the distributed matrix storing the actual data. - * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to - * and from the ordinal Mahout Int ID. This one holds row labels - * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String - * ID to and from the ordinal Mahout Int ID. This one holds column labels - * @todo Often no need for both or perhaps either dictionary, so save resources by allowing - * to be not created when not needed. - */ - -case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { - - // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we - // learn this afterwards - - /** - * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value. - * No physical changes are made to the underlying drm. - * @param n number to use for row carnindality, should be larger than current - * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable - * results. - */ - def newRowCardinality(n: Int): IndexedDataset = { - assert(n > -1) - assert( n >= matrix.nrow) - val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd - val ncol = matrix.ncol - val newMatrix = drmWrap[Int](drmRdd, n, ncol) - new IndexedDataset(newMatrix, rowIDs, columnIDs) - } - -} - -/** - * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary - * constructor for - * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like - * [[org.apache.mahout.drivers.Reader]] - * {{{ - * val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source)) - * }}} - */ - -object IndexedDataset { - /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */ - def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix, id2.rowIDs, id2.columnIDs) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 0b8ded6..42e9d81 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -17,15 +17,19 @@ package org.apache.mahout.drivers +import org.apache.mahout.common.HDFSPathSearch import org.apache.mahout.math.cf.SimilarityAnalysis +import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSReadElements} +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scala.collection.immutable.HashMap /** - * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]. + * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]. * Reads text lines * that contain (row id, column id, ...). The IDs are user specified strings which will be * preserved in the - * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]] + * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]] + * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]] * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two * matrices and calculate both the self similarity of the primary matrix and the row-wise * similarity of the primary @@ -40,17 +44,16 @@ import scala.collection.immutable.HashMap * @note To use with a Spark cluster see the --master option, if you run out of heap space check * the --sparkExecutorMemory option. */ -object ItemSimilarityDriver extends MahoutDriver { +object ItemSimilarityDriver extends MahoutSparkDriver { // define only the options specific to ItemSimilarity private final val ItemSimilarityOptions = HashMap[String, Any]( "maxPrefs" -> 500, "maxSimilaritiesPerItem" -> 100, "appName" -> "ItemSimilarityDriver") - private var reader1: TextDelimitedIndexedDatasetReader = _ - private var reader2: TextDelimitedIndexedDatasetReader = _ - private var writer: TextDelimitedIndexedDatasetWriter = _ private var writeSchema: Schema = _ + private var readSchema1: Schema = _ + private var readSchema2: Schema = _ /** * @param args Command line args, if empty a help message is printed. @@ -119,27 +122,27 @@ object ItemSimilarityDriver extends MahoutDriver { // todo: the HashBiMap used in the TextDelimited Reader is hard coded into // MahoutKryoRegistrator, it should be added to the register list here so it - // will be only spcific to this job. + // will be only specific to this job. sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200") - .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option? + + if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") + sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + //else leave as set in Spark config super.start(masterUrl, appName) - val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], + readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], "filter" -> parser.opts("filter1").asInstanceOf[String], "rowIDColumn" -> parser.opts("rowIDColumn").asInstanceOf[Int], "columnIDPosition" -> parser.opts("itemIDColumn").asInstanceOf[Int], "filterColumn" -> parser.opts("filterColumn").asInstanceOf[Int]) - reader1 = new TextDelimitedIndexedDatasetReader(readSchema1) - if ((parser.opts("filterColumn").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null) || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){ // only need to change the filter used compared to readSchema1 - val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String]) + readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String]) - reader2 = new TextDelimitedIndexedDatasetReader(readSchema2) } writeSchema = new Schema( @@ -147,36 +150,34 @@ object ItemSimilarityDriver extends MahoutDriver { "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String], "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean], "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String]) - - writer = new TextDelimitedIndexedDatasetWriter(writeSchema) - - } + } private def readIndexedDatasets: Array[IndexedDataset] = { - val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) "" - else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris if (inFiles.isEmpty) { Array() } else { - val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles)) - if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA, - parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions") + val datasetA = indexedDatasetDFSReadElements(inFiles,readSchema1) + if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) + datasetA.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions", + schema = writeSchema) - // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B + // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B // Here we assume there is one row ID space for all interactions. To do this we calculate the // row cardinality only after reading in A and B (or potentially C...) We then adjust the // cardinality so all match, which is required for the math to work. // Note: this may leave blank rows with no representation in any DRM. Blank rows need to - // be supported (and are at least on Spark) or the row cardinality fix will not work. + // be supported (and are at least on Spark) or the row cardinality adjustment will not work. val datasetB = if (!inFiles2.isEmpty) { // get cross-cooccurrence interactions from separate files - val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs)) + val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs) datasetB @@ -184,12 +185,12 @@ object ItemSimilarityDriver extends MahoutDriver { && parser.opts("filter2").asInstanceOf[String] != null) { // get cross-cooccurrences interactions by using two filters on a single set of files - val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs)) + val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs) datasetB } else { - null.asInstanceOf[IndexedDataset] + null.asInstanceOf[IndexedDatasetSpark] } if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc // true row cardinality is the size of the row id index, which was calculated from all rows of A and B @@ -203,8 +204,9 @@ object ItemSimilarityDriver extends MahoutDriver { val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality) else datasetB // this guarantees matching cardinality - if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions") - + if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) + datasetB.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/secondary-interactions", + schema = writeSchema) Array(returnedA, returnedB) } else Array(datasetA) } @@ -214,31 +216,13 @@ object ItemSimilarityDriver extends MahoutDriver { start() val indexedDatasets = readIndexedDatasets + val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int], + parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int]) // todo: allow more than one cross-similarity matrix? - val indicatorMatrices = { - if (indexedDatasets.length > 1) { - SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int], - parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int], - Array(indexedDatasets(1).matrix)) - } else { - SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int], - parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int]) - } - } - - // an alternative is to create a version of IndexedDataset that knows how to write itself - val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs, - indexedDatasets(0).columnIDs, writeSchema) - selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix") - - // todo: would be nice to support more than one cross-similarity indicator - if (indexedDatasets.length > 1) { - - val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity - writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix") - - } + idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema) + if(idss.length > 1) + idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema) stop } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala deleted file mode 100644 index 6ea7c8b..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.drivers - -import org.apache.mahout.math.drm.DistributedContext -import org.apache.spark.SparkConf -import org.apache.mahout.sparkbindings._ - -import scala.collection.immutable - -/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. - * Also define a Map of options for the command line parser. The following template may help: - * {{{ - * object SomeDriver extends MahoutDriver { - * - * // define only the options specific to this driver, inherit the generic ones - * private final val SomeOptions = HashMap[String, Any]( - * "maxThings" -> 500, - * "minThings" -> 100, - * "appName" -> "SomeDriver") - * - * override def main(args: Array[String]): Unit = { - * - * - * val parser = new MahoutOptionParser(programName = "shortname") { - * head("somedriver", "Mahout 1.0-SNAPSHOT") - * - * // Input output options, non-driver specific - * parseIOOptions - * - * // Algorithm specific options - * // Add in the new options - * opts = opts ++ SomeOptions - * note("\nAlgorithm control options:") - * opt[Int]("maxThings") abbr ("mt") action { (x, options) => - * options + ("maxThings" -> x) ... - * } - * parser.parse(args, parser.opts) map { opts => - * parser.opts = opts - * process - * } - * } - * - * override def process: Unit = { - * start() - * // do the work here - * stop - * } - * - * }}} - */ -abstract class MahoutDriver { - - - implicit protected var mc: DistributedContext = _ - implicit protected var sparkConf = new SparkConf() - protected var parser: MahoutOptionParser = _ - - var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite - - /** Creates a Spark context to run the job inside. - * Override to set the SparkConf values specific to the job, - * these must be set before the context is created. - * @param masterUrl Spark master URL - * @param appName Name to display in Spark UI - * */ - protected def start(masterUrl: String, appName: String) : Unit = { - if (!_useExistingContext) { - mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf) - } - } - - /** Override (optionally) for special cleanup */ - protected def stop: Unit = { - if (!_useExistingContext) mc.close - } - - /** This is where you do the work, call start first, then before exiting call stop */ - protected def process: Unit - - /** Parse command line and call process */ - def main(args: Array[String]): Unit - - def useContext(context: DistributedContext): Unit = { - _useExistingContext = true - mc = context - sparkConf = mc.getConf - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala deleted file mode 100644 index ad7a76b..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.drivers - -import scopt.OptionParser -import scala.collection.immutable - -/** Companion object defines default option groups for reference in any driver that needs them */ -object MahoutOptionParser { - // set up the various default option groups - final val GenericOptions = immutable.HashMap[String, Any]( - "randomSeed" -> System.currentTimeMillis().toInt, - "writeAllDatasets" -> false) - - final val SparkOptions = immutable.HashMap[String, Any]( - "master" -> "local", - "sparkExecutorMem" -> "2g", - "appName" -> "Generic Spark App, Change this.") - - final val FileIOOptions = immutable.HashMap[String, Any]( - "input" -> null.asInstanceOf[String], - "input2" -> null.asInstanceOf[String], - "output" -> null.asInstanceOf[String]) - - final val FileDiscoveryOptions = immutable.HashMap[String, Any]( - "recursive" -> false, - "filenamePattern" -> "^part-.*") - - final val TextDelimitedElementsOptions = immutable.HashMap[String, Any]( - "rowIDColumn" -> 0, - "itemIDColumn" -> 1, - "filterColumn" -> -1, - "filter1" -> null.asInstanceOf[String], - "filter2" -> null.asInstanceOf[String], - "inDelim" -> "[,\t ]") - - final val TextDelimitedDRMOptions = immutable.HashMap[String, Any]( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ", - "omitStrength" -> false) -} -/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to - * keep both standarized. - * @param programName Name displayed in help message, the name by which the driver is invoked. - * */ -class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { - - // build options from some stardard CLI param groups - // Note: always put the driver specific options at the last so the can override and previous options! - var opts = Map.empty[String, Any] - - override def showUsageOnError = true - - def parseIOOptions(numInputs: Int = 1) = { - opts = opts ++ MahoutOptionParser.FileIOOptions - note("Input, output options") - opt[String]('i', "input") required() action { (x, options) => - options + ("input" -> x) - } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") - - if (numInputs == 2) { - opt[String]("input2") abbr ("i2") action { (x, options) => - options + ("input2" -> x) - } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") - } - - opt[String]('o', "output") required() action { (x, options) => - // todo: check to see if HDFS allows MS-Windows backslashes locally? - if (x.endsWith("/")) { - options + ("output" -> x) - } else { - options + ("output" -> (x + "/")) - } - } text ("Path for output, any local or HDFS supported URI (required)") - - } - - def parseSparkOptions = { - opts = opts ++ MahoutOptionParser.SparkOptions - note("\nSpark config options:") - - opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => - options + ("master" -> x) - } - - opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) => - options + ("sparkExecutorMem" -> x) - } - - } - - def parseGenericOptions = { - opts = opts ++ MahoutOptionParser.GenericOptions - opt[Int]("randomSeed") abbr ("rs") action { (x, options) => - options + ("randomSeed" -> x) - } validate { x => - if (x > 0) success else failure("Option --randomSeed must be > 0") - } - - //output both input DRMs - opt[Unit]("writeAllDatasets") hidden() action { (_, options) => - options + ("writeAllDatasets" -> true) - }//Hidden option, though a user might want this. - } - - def parseElementInputSchemaOptions{ - //Input text file schema--not driver specific but input data specific, elements input, - // not drms - opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions - note("\nInput text file schema options:") - opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) => - options + ("inDelim" -> x) - } - - opt[String]("filter1") abbr ("f1") action { (x, options) => - options + ("filter1" -> x) - } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used") - - opt[String]("filter2") abbr ("f2") action { (x, options) => - options + ("filter2" -> x) - } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected") - - opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) => - options + ("rowIDColumn" -> x) - } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => - if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") - } - - opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) => - options + ("itemIDColumn" -> x) - } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => - if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") - } - - opt[Int]("filterColumn") abbr ("fc") action { (x, options) => - options + ("filterColumn" -> x) - } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => - if (x >= -1) success else failure("Option --filterColNum must be >= -1") - } - - note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") - - checkConfig { options: Map[String, Any] => - if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int] - || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int] - || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]) - failure("The row, item, and filter positions must be unique.") else success - } - - //check for option consistency, probably driver specific - checkConfig { options: Map[String, Any] => - if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String] - && options("filter2").asInstanceOf[String] != null.asInstanceOf[String] - && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String]) - failure ("If using filters they must be unique.") else success - } - - } - - def parseFileDiscoveryOptions = { - //File finding strategy--not driver specific - opts = opts ++ MahoutOptionParser.FileDiscoveryOptions - note("\nFile discovery options:") - opt[Unit]('r', "recursive") action { (_, options) => - options + ("recursive" -> true) - } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") - - opt[String]("filenamePattern") abbr ("fp") action { (x, options) => - options + ("filenamePattern" -> x) - } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory") - - } - - def parseDrmFormatOptions = { - opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions - note("\nOutput text file schema options:") - opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => - options + ("rowKeyDelim" -> x) - } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") - - opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => - options + ("columnIdStrengthDelim" -> x) - } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") - - opt[String]("elementDelim") abbr ("td") action { (x, options) => - options + ("elementDelim" -> x) - } text ("Separates vector element values in the values list (optional). Default: \" \"") - - opt[Unit]("omitStrength") abbr ("os") action { (_, options) => - options + ("omitStrength" -> true) - } text ("Do not write the strength to the output files (optional), Default: false.") - note("This option is used to output indexable data for creating a search engine recommender.") - - note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") - } - -} - - http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala new file mode 100644 index 0000000..5a4254b --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.mahout.math.drm.DistributedContext +import org.apache.spark.SparkConf +import org.apache.mahout.sparkbindings._ + +/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. + * Also define a Map of options for the command line parser. The following template may help: + * {{{ + * object SomeDriver extends MahoutDriver { + * + * // define only the options specific to this driver, inherit the generic ones + * private final val SomeOptions = HashMap[String, Any]( + * "maxThings" -> 500, + * "minThings" -> 100, + * "appName" -> "SomeDriver") + * + * override def main(args: Array[String]): Unit = { + * + * + * val parser = new MahoutOptionParser(programName = "shortname") { + * head("somedriver", "Mahout 1.0-SNAPSHOT") + * + * // Input output options, non-driver specific + * parseIOOptions + * + * // Algorithm specific options + * // Add in the new options + * opts = opts ++ SomeOptions + * note("\nAlgorithm control options:") + * opt[Int]("maxThings") abbr ("mt") action { (x, options) => + * options + ("maxThings" -> x) ... + * } + * parser.parse(args, parser.opts) map { opts => + * parser.opts = opts + * process + * } + * } + * + * override def process: Unit = { + * start() + * // do the work here + * stop + * } + * + * }}} + */ +abstract class MahoutSparkDriver extends MahoutDriver { + + + implicit protected var sparkConf = new SparkConf() + + /** Creates a Spark context to run the job inside. + * Override to set the SparkConf values specific to the job, + * these must be set before the context is created. + * @param masterUrl Spark master URL + * @param appName Name to display in Spark UI + * */ + protected def start(masterUrl: String, appName: String) : Unit = { + if (!_useExistingContext) { + mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf) + } + } + + def useContext(context: DistributedContext): Unit = { + _useExistingContext = true + mc = context + sparkConf = mc.getConf + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala deleted file mode 100644 index 6351e45..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.drivers - -import com.google.common.collect.{HashBiMap, BiMap} -import org.apache.mahout.math.drm.DistributedContext - -/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read. - * @tparam T type of object read. - */ -trait Reader[T]{ - - val mc: DistributedContext - val readSchema: Schema - - protected def elementReader( - mc: DistributedContext, - readSchema: Schema, - source: String, - existingRowIDs: BiMap[String, Int]): T - - protected def drmReader( - mc: DistributedContext, - readSchema: Schema, - source: String, - existingRowIDs: BiMap[String, Int]): T - - def readElementsFrom( - source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = - elementReader(mc, readSchema, source, existingRowIDs) - - def readDRMFrom( - source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = - drmReader(mc, readSchema, source, existingRowIDs) -} - -/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written. - * @tparam T type of object to write. - */ -trait Writer[T]{ - - val mc: DistributedContext - val sort: Boolean - val writeSchema: Schema - - protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit - - def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) -}
