removed guava from scala and created an all-scala BiMap and BiDictionary, still need guava for mahout-math and mahout-hdfs but it's not used in broadcast so no error in general use. Also fixed a collect of interactions DRM, a nasty memory hog bug.
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3d78096b Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3d78096b Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3d78096b Branch: refs/heads/mahout-0.10.x Commit: 3d78096bdb61b4ba20336f3fc51d42a5af77cc3c Parents: 9abb526 Author: pferrel <[email protected]> Authored: Wed May 20 11:16:15 2015 -0700 Committer: pferrel <[email protected]> Committed: Wed May 20 11:16:15 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + bin/mahout | 3 +- buildtools/pom.xml | 2 +- distribution/pom.xml | 4 +- examples/pom.xml | 2 +- h2o/pom.xml | 28 ++-- h2o/src/main/assembly/dependency-reduced.xml | 44 +++++++ .../apache/mahout/h2obindings/H2OEngine.scala | 9 +- .../h2obindings/test/DistributedH2OSuite.scala | 1 - hdfs/pom.xml | 2 +- integration/pom.xml | 2 +- math-scala/pom.xml | 29 +--- .../mahout/math/cf/SimilarityAnalysis.scala | 16 ++- .../mahout/math/drm/DistributedEngine.scala | 7 +- .../org/apache/mahout/math/drm/package.scala | 5 +- .../mahout/math/indexeddataset/BiMap.scala | 128 ++++++++++++++++++ .../math/indexeddataset/IndexedDataset.scala | 10 +- .../math/indexeddataset/ReaderWriter.scala | 18 +-- math/pom.xml | 2 +- .../apache/mahout/math/SparseColumnMatrix.java | 4 +- mr/pom.xml | 2 +- pom.xml | 8 +- spark-shell/pom.xml | 27 +++- spark/pom.xml | 7 +- spark/src/main/assembly/dependency-reduced.xml | 1 + .../mahout/drivers/ItemSimilarityDriver.scala | 10 +- .../mahout/drivers/MahoutSparkDriver.scala | 5 - .../drivers/TextDelimitedReaderWriter.scala | 132 +++++++++---------- .../mahout/sparkbindings/SparkEngine.scala | 7 +- .../indexeddataset/IndexedDatasetSpark.scala | 66 +++++++++- .../io/MahoutKryoRegistrator.scala | 4 +- .../drivers/ItemSimilarityDriverSuite.scala | 97 +++++++++++++- 32 files changed, 493 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 1657afc..66cbf80 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 0.10.1 - unreleased + MAHOUT-1704: Pare down dependency jar for h2o (apalumbo) + MAHOUT-1697: Fixed paths to which math-scala and spark modules docs get packaged under in bin distribution archive (sslavic) MAHOUT-1696: QRDecomposition.solve(...) can return incorrect Matrix types (apalumbo) http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/bin/mahout ---------------------------------------------------------------------- diff --git a/bin/mahout b/bin/mahout index 772c184..ee0b918 100755 --- a/bin/mahout +++ b/bin/mahout @@ -186,9 +186,10 @@ then CLASSPATH=${CLASSPATH}:$f; done - for f in $MAHOUT_HOME/h2o/target/mahout-h2o-*.jar; do + for f in $MAHOUT_HOME/h2o/target/mahout-h2o*.jar; do CLASSPATH=${CLASSPATH}:$f; done + fi # add jars for running from the command line if we requested shell or spark CLI driver http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/buildtools/pom.xml ---------------------------------------------------------------------- diff --git a/buildtools/pom.xml b/buildtools/pom.xml index 9510c5d..c1baa2f 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -29,7 +29,7 @@ <groupId>org.apache.mahout</groupId> <artifactId>mahout-buildtools</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <name>Mahout Build Tools</name> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index bc17a08..156ab5d 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -20,10 +20,10 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <artifactId>apache-mahout-distribution</artifactId> + <artifactId>mahout-distribution</artifactId> <name>Mahout Release Package</name> <description>Distribution Package</description> <packaging>pom</packaging> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index e22c6e1..d800df4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/pom.xml ---------------------------------------------------------------------- diff --git a/h2o/pom.xml b/h2o/pom.xml index b9d101a..662b073 100644 --- a/h2o/pom.xml +++ b/h2o/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -35,27 +35,26 @@ <packaging>jar</packaging> + + <build> - <plugins> + <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - <archive> - <manifest> - <mainClass>water.H2O</mainClass> - </manifest> - </archive> - </configuration> <executions> <execution> + <id>dependency-reduced</id> <phase>package</phase> <goals> <goal>single</goal> </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/dependency-reduced.xml</descriptor> + </descriptors> + </configuration> </execution> </executions> </plugin> @@ -124,6 +123,11 @@ </build> <dependencies> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> <dependency> <groupId>org.apache.mahout</groupId> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/main/assembly/dependency-reduced.xml ---------------------------------------------------------------------- diff --git a/h2o/src/main/assembly/dependency-reduced.xml b/h2o/src/main/assembly/dependency-reduced.xml new file mode 100644 index 0000000..0636f1d --- /dev/null +++ b/h2o/src/main/assembly/dependency-reduced.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 + http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>dependency-reduced</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <unpack>true</unpack> + <unpackOptions> + <!-- MAHOUT-1126 --> + <excludes> + <exclude>META-INF/LICENSE</exclude> + </excludes> + </unpackOptions> + <scope>runtime</scope> + <outputDirectory>/</outputDirectory> + <useTransitiveFiltering>true</useTransitiveFiltering> + <includes> + <include>ai.h2o:h2o-core</include> + <include>org.scala-lang:scala-library</include> + </includes> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 4c34f31..173d5a0 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -17,8 +17,7 @@ package org.apache.mahout.h2obindings -import com.google.common.collect.{HashBiMap, BiMap} -import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema} +import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema, DefaultIndexedDatasetReadSchema} import scala.reflect._ import org.apache.mahout.math._ @@ -117,7 +116,7 @@ object H2OEngine extends DistributedEngine { implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]] /** stub class not implemented in H2O */ - abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int]) + abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, val columnIDs: BiDictionary) extends IndexedDataset {} /** @@ -129,7 +128,7 @@ object H2OEngine extends DistributedEngine { */ def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDatasetH2O = { // should log a warning when this is built but no logger here, can an H2O contributor help with this @@ -147,7 +146,7 @@ object H2OEngine extends DistributedEngine { */ def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDatasetH2O = { // should log a warning when this is built but no logger here, can an H2O contributor help with this http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala ---------------------------------------------------------------------- diff --git a/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala index abb4289..26182b4 100644 --- a/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala +++ b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala @@ -29,7 +29,6 @@ trait DistributedH2OSuite extends DistributedMahoutSuite with LoggerConfiguratio override protected def beforeEach() { super.beforeEach() - mahoutCtx = mahoutH2OContext("mah2out" + System.currentTimeMillis()) } http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hdfs/pom.xml b/hdfs/pom.xml index 30d47c3..0498cce 100644 --- a/hdfs/pom.xml +++ b/hdfs/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml index a68e7f1..e949aaf 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/pom.xml ---------------------------------------------------------------------- diff --git a/math-scala/pom.xml b/math-scala/pom.xml index 78331dd..84e59af 100644 --- a/math-scala/pom.xml +++ b/math-scala/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -136,35 +136,12 @@ <!-- scala stuff --> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-actors</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scalap</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.compat.version}</artifactId> </dependency> + + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala index 6557ab0..fd91c16 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -68,7 +68,7 @@ object SimilarityAnalysis extends Serializable { // Compute & broadcast the number of interactions per thing in A val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - // Compute co-occurrence matrix A'A + // Compute cooccurrence matrix A'A val drmAtA = drmA.t %*% drmA // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix @@ -77,7 +77,7 @@ object SimilarityAnalysis extends Serializable { var similarityMatrices = List(drmSimilarityAtA) - // Now look at cross-co-occurrences + // Now look at cross cooccurrences for (drmBRaw <- drmBs) { // Down-sample and pin other interaction matrix val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() @@ -85,7 +85,7 @@ object SimilarityAnalysis extends Serializable { // Compute & broadcast the number of interactions per thing in B val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - // Compute cross-co-occurrence matrix A'B + // Compute cross-cooccurrence matrix A'B val drmAtB = drmA.t %*% drmB val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, @@ -94,11 +94,21 @@ object SimilarityAnalysis extends Serializable { similarityMatrices = similarityMatrices :+ drmSimilarityAtB drmB.uncache() + + //debug + val atbRows = drmSimilarityAtB.nrow + val atbCols = drmSimilarityAtB.ncol + val i = 0 } // Unpin downsampled interaction matrix drmA.uncache() + //debug + val ataRows = drmSimilarityAtA.nrow + val ataCols = drmSimilarityAtA.ncol + val i = 0 + // Return list of similarity matrices similarityMatrices } http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index dd5b101..bb6772a 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -17,8 +17,7 @@ package org.apache.mahout.math.drm -import com.google.common.collect.{HashBiMap, BiMap} -import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset} +import org.apache.mahout.math.indexeddataset._ import scala.reflect.ClassTag import logical._ @@ -95,7 +94,7 @@ trait DistributedEngine { */ def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDataset @@ -106,7 +105,7 @@ trait DistributedEngine { */ def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetElementReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDataset http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 81f6ab1..1fae831 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math -import com.google.common.collect.{HashBiMap, BiMap} import org.apache.mahout.math.drm.DistributedContext import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema} import org.apache.mahout.math.scalabindings.RLikeOps._ @@ -122,13 +121,13 @@ package object indexeddataset { /** Load IndexedDataset from text delimited files */ def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit ctx: DistributedContext): IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs) def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit ctx: DistributedContext): IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs) http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala new file mode 100644 index 0000000..6c0d432 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.math.indexeddataset + +import scala.collection.immutable.HashMap + +/** + * Immutable Bi-directional Map. + * @param m Map to use for forward reference + * @param i optional reverse map of value to key, will create one lazily if none is provided + * and is required to have no duplicate reverse mappings. + */ +class BiMap[K, V] ( + private val m: Map[K, V], + // if this is serialized we allow i to be discarded and recalculated when deserialized + @transient private var i: Option[BiMap[V, K]] = None + ) extends Serializable { + + // NOTE: make inverse's inverse point back to current BiMap + // if this is serialized we allow inverse to be discarded and recalculated when deserialized + @transient lazy val inverse: BiMap[V, K] = { + if( i == null.asInstanceOf[Option[BiMap[V, K]]] ) + i = None + i.getOrElse { + val rev = m.map(_.swap) + require((rev.size == m.size), "Failed to create reversed map. Cannot have duplicated values.") + new BiMap(rev, Some(this)) + } + } + + // forces inverse to be calculated in the constructor when deserialized + // not when first used + @transient val size_ = inverse.size + + def get(k: K): Option[V] = m.get(k) + + def getOrElse(k: K, default: => V): V = m.getOrElse(k, default) + + def contains(k: K): Boolean = m.contains(k) + + def apply(k: K): V = m.apply(k) + + /** + * Converts to a map. + * @return a map of type immutable.Map[K, V] + */ + def toMap: Map[K, V] = m + + /** + * Converts to a sequence. + * @return a sequence containing all elements of this map + */ + def toSeq: Seq[(K, V)] = m.toSeq + + def size: Int = m.size + + def take(n: Int) = BiMap(m.take(n)) + + override def toString = m.toString +} + +object BiMap { + + /** Extra constructor from a map */ + def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x) + +} + +/** BiDictionary is a specialized BiMap that has non-negative Ints as values for use as DRM keys */ +class BiDictionary ( + private val m: Map[String, Int], + @transient private val i: Option[BiMap[Int, String]] = None ) + extends BiMap[String, Int](m, i) { + + /** + * Create a new BiDictionary with the keys supplied and values ranging from 0 to size -1 + * @param keys a set of String + */ + def this(keys: Seq[String]) = { + this(HashMap(keys.view.zipWithIndex: _*)) + } + + def merge( + keys: Seq[String]): BiDictionary = { + + var newIDs = List[String]() + + for (key <- keys) { + if (!m.contains(key)) newIDs = key +: newIDs + } + if(newIDs.isEmpty) this else new BiDictionary(m ++ HashMap(newIDs.view.zip (Stream from size): _*)) + + } + +} + +/** BiDictionary is a specialized BiMap that has non-negative Ints as values for use as DRM keys. + * The companion object provides modification methods specific to maintaining contiguous Int values + * and unique String keys */ +object BiDictionary { + + /** + * Append new keys to an existing BiDictionary and return the result. The values will start + * at m.size and increase to create a continuous non-zero value set from 0 to size - 1 + * @param keys new keys to append, not checked for uniqueness so may be dangerous + * @param biDi merge keys to this BiDictionary and create new values buy incremeting from the highest Int value + * @return a BiDictionary with added mappings + */ + /*def append(keys: Seq[String], biDi: BiDictionary): BiDictionary = { + val hm = HashMap(keys.view.zip (Stream from biDi.size): _*) + new BiDictionary(biDi.m ++ hm) + }*/ + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala index f6811e2..eeca736 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala @@ -17,12 +17,10 @@ package org.apache.mahout.math.indexeddataset -import com.google.common.collect.BiMap import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} -import org.apache.mahout.math.indexeddataset /** - * Wrap an [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]] + * Wrap an [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[org.apache.mahout.math.indexeddataset.BiDictionary]] * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout * core code. * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created @@ -31,8 +29,8 @@ import org.apache.mahout.math.indexeddataset trait IndexedDataset { val matrix: CheckpointedDrm[Int] - val rowIDs: BiMap[String,Int] - val columnIDs: BiMap[String,Int] + val rowIDs: BiDictionary + val columnIDs: BiDictionary /** * Write a text delimited file(s) with the row and column IDs from dictionaries. @@ -43,7 +41,7 @@ trait IndexedDataset { def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit /** Factory method, creates the extending class and returns a new instance */ - def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): + def create(matrix: CheckpointedDrm[Int], rowIDs: BiDictionary, columnIDs: BiDictionary): IndexedDataset /** http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala index f7653ae..65c0d8f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala @@ -17,8 +17,8 @@ package org.apache.mahout.math.indexeddataset -import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.indexeddataset /** * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an @@ -35,7 +35,7 @@ trait Reader[T]{ * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from * @param readSchema map of parameters controlling formating and how the read is executed * @param source list of comma delimited files to read from - * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already + * @param existingRowIDs [[indexeddataset.BiDictionary]] containing row IDs that have already * been applied to this collection--used to synchronize row IDs between several * collections * @return a new collection of type T @@ -44,14 +44,14 @@ trait Reader[T]{ mc: DistributedContext, readSchema: Schema, source: String, - existingRowIDs: BiMap[String, Int]): T + existingRowIDs: Option[BiDictionary] = None): T /** * Override in extending trait to supply T and perform a parallel read of collection rows * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from * @param readSchema map of parameters controlling formating and how the read is executed * @param source list of comma delimited files to read from - * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already + * @param existingRowIDs [[indexeddataset.BiDictionary]] containing row IDs that have already * been applied to this collection--used to synchronize row IDs between several * collections * @return a new collection of type T @@ -60,30 +60,30 @@ trait Reader[T]{ mc: DistributedContext, readSchema: Schema, source: String, - existingRowIDs: BiMap[String, Int]): T + existingRowIDs: Option[BiDictionary] = None): T /** * Public method called to perform the element-wise read. Usually no need to override * @param source comma delimited URIs to read from - * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used + * @param existingRowIDs a [[indexeddataset.BiDictionary]] containing previously used id mappings--used * to synchronize all row ids is several collections * @return a new collection of type T */ def readElementsFrom( source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + existingRowIDs: Option[BiDictionary] = None): T = elementReader(mc, readSchema, source, existingRowIDs) /** * Public method called to perform the row-wise read. Usually no need to override. * @param source comma delimited URIs to read from - * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used + * @param existingRowIDs a [[indexeddataset.BiDictionary]] containing previously used id mappings--used * to synchronize all row ids is several collections * @return a new collection of type T */ def readRowsFrom( source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + existingRowIDs: Option[BiDictionary] = None): T = rowReader(mc, readSchema, source, existingRowIDs) } http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math/pom.xml ---------------------------------------------------------------------- diff --git a/math/pom.xml b/math/pom.xml index 59a3f0f..b41bcd3 100644 --- a/math/pom.xml +++ b/math/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java ---------------------------------------------------------------------- diff --git a/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java index d847dea..f62d553 100644 --- a/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java +++ b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java @@ -184,9 +184,9 @@ public class SparseColumnMatrix extends AbstractMatrix { StringBuilder s = new StringBuilder("{\n"); for (MatrixSlice next : this.transpose()) { if (row < maxRowsToDisplay) { - s.append(" ") + s.append(" ") .append(next.index()) - .append(" =>\t") + .append(" =>\t") .append(new VectorView(next.vector(), 0, colsToDisplay)) .append('\n'); row++; http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/mr/pom.xml ---------------------------------------------------------------------- diff --git a/mr/pom.xml b/mr/pom.xml index 3f5afa3..60e52ac 100644 --- a/mr/pom.xml +++ b/mr/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 74da44e..61a5b57 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <packaging>pom</packaging> <name>Apache Mahout</name> @@ -100,8 +100,8 @@ <scm> <connection>scm:git:[email protected]:apache/mahout.git</connection> <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/mahout.git</developerConnection> - <url>scm:git:[email protected]:apache/mahout.git</url> - <tag>HEAD</tag> + <url>https://git-wip-us.apache.org/repos/asf?p=maven.git;a=tree;h=refs/heads/${project.scm.tag};hb=${project.scm.tag}</url> + <tag>mahout-0.10.x</tag> </scm> <properties> <skipTests>false</skipTests> @@ -120,7 +120,7 @@ <slf4j.version>1.7.10</slf4j.version> <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> - <spark.version>1.1.1</spark.version> + <spark.version>1.2.2</spark.version> <h2o.version>0.1.25</h2o.version> </properties> <issueManagement> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark-shell/pom.xml ---------------------------------------------------------------------- diff --git a/spark-shell/pom.xml b/spark-shell/pom.xml index 0903534..b5f283d 100644 --- a/spark-shell/pom.xml +++ b/spark-shell/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -134,6 +134,31 @@ <!-- scala stuff --> <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-actors</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.compat.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 5646c25..ce42c44 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.11.0-SNAPSHOT</version> + <version>0.10.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -156,11 +156,6 @@ </dependency> <!-- 3rd-party --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>14.0.1</version> - </dependency> <!-- scala stuff --> <dependency> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/assembly/dependency-reduced.xml ---------------------------------------------------------------------- diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml index 3c52d35..6f1e4c2 100644 --- a/spark/src/main/assembly/dependency-reduced.xml +++ b/spark/src/main/assembly/dependency-reduced.xml @@ -36,6 +36,7 @@ <outputDirectory>/</outputDirectory> <useTransitiveFiltering>true</useTransitiveFiltering> <includes> + <!-- guava only included to get Preconditions in mahout-math and mahout-hdfs --> <include>com.google.guava:guava</include> <include>com.github.scopt</include> <include>com.tdunning:t-digest</include> http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 34e8cf9..d7f2787 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -162,7 +162,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { // be supported (and are at least on Spark) or the row cardinality adjustment will not work. val datasetB = if (!inFiles2.isEmpty) { // get cross-cooccurrence interactions from separate files - val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs) + val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = Some(datasetA.rowIDs)) datasetB @@ -170,7 +170,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { && parser.opts("filter2").asInstanceOf[String] != null) { // get cross-cooccurrences interactions by using two filters on a single set of files - val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs) + val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = Some(datasetA.rowIDs)) datasetB @@ -178,11 +178,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver { null.asInstanceOf[IndexedDatasetSpark] } if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc - // true row cardinality is the size of the row id index, which was calculated from all rows of A and B - val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality + // true row cardinality is the size of the row id index, which was calculated from all rows of A and B + val rowCardinality = datasetB.rowIDs.size // the authoritative row cardinality - // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on - // its calculation val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality) else datasetA // this guarantees matching cardinality http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index 668d70c..40ffab3 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -74,11 +74,6 @@ abstract class MahoutSparkDriver extends MahoutDriver { */ override protected def start() : Unit = { if (!_useExistingContext) { - /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work - sparkConf.set("spark.files.userClassPathFirst", "true") - sparkConf.set("spark.executor.userClassPathFirst", "true") - */ - sparkConf.set("spark.kryo.referenceTracking", "false") .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index 6c7992a..e2a2a9a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -17,11 +17,11 @@ package org.apache.mahout.drivers -import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, IndexedDataset} +import org.apache.log4j.Logger +import org.apache.mahout.math.indexeddataset._ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.SparkContext._ import org.apache.mahout.math.RandomAccessSparseVector -import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, CheckpointedDrm} import org.apache.mahout.sparkbindings._ import scala.collection.JavaConversions._ @@ -42,10 +42,11 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] */ protected def elementReader( - mc: DistributedContext, - readSchema: Schema, - source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = { + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: Option[BiDictionary] = None): IndexedDatasetSpark = { + @transient lazy val logger = Logger.getLogger(this.getClass.getCanonicalName) try { val delimiter = readSchema("delim").asInstanceOf[String] val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int] @@ -54,10 +55,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ val filterBy = readSchema("filter").asInstanceOf[String] // instance vars must be put into locally scoped vals when used in closures that are executed but Spark - assert(!source.isEmpty, { - println(this.getClass.toString + ": has no files to read") - throw new IllegalArgumentException - }) + + require (!source.isEmpty, "No file(s) to read") var columns = mc.textFile(source).map { line => line.split(delimiter) } @@ -68,7 +67,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ } // get row and column IDs - //val m = columns.collect val interactions = columns.map { tokens => tokens(rowIDColumn) -> tokens(columnIDPosition) } @@ -79,21 +77,25 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect() val columnIDs = interactions.map { case (_, columnID) => columnID }.distinct().collect() - // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID + // create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID // broadcast them for access in distributed processes, so they are not recalculated in every task. - val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs) + //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs) + val rowIDDictionary = existingRowIDs match { + case Some(d) => d.merge(rowIDs) + case None => new BiDictionary(rowIDs) + } val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary) - val columnIDDictionary = asOrderedDictionary(entries = columnIDs) + val columnIDDictionary = new BiDictionary(keys = columnIDs) val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary) - val ncol = columnIDDictionary.size() - val nrow = rowIDDictionary.size() + val ncol = columnIDDictionary.size + val nrow = rowIDDictionary.size val indexedInteractions = interactions.map { case (rowID, columnID) => - val rowIndex = rowIDDictionary_bcast.value.get(rowID).get - val columnIndex = columnIDDictionary_bcast.value.get(columnID).get + val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1) + val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, -1) rowIndex -> columnIndex } @@ -108,14 +110,13 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ .asInstanceOf[DrmRdd[Int]] // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed - //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) val drmInteractions = drmWrap[Int](indexedInteractions) new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) } catch { case cce: ClassCastException => { - println(this.getClass.toString + ": Schema has illegal values"); throw cce + logger.error("Schema has illegal values"); throw cce } } } @@ -130,21 +131,17 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] */ protected def rowReader( - mc: DistributedContext, - readSchema: Schema, - source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = { + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: Option[BiDictionary] = None): IndexedDatasetSpark = { + @transient lazy val logger = Logger.getLogger(this.getClass.getCanonicalName) try { val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String] val elementDelim = readSchema("elementDelim").asInstanceOf[String] - // no need for omitScore since we can tell if there is a score and assume it is 1.0d if not specified - //val omitScore = readSchema("omitScore").asInstanceOf[Boolean] - assert(!source.isEmpty, { - println(this.getClass.toString + ": has no files to read") - throw new IllegalArgumentException - }) + require (!source.isEmpty, "No file(s) to read") var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) } @@ -154,7 +151,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ } interactions.cache() - interactions.collect() + // forces into memory so only for debugging + //interactions.collect() // create separate collections of rowID and columnID tokens val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect() @@ -168,24 +166,28 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID // broadcast them for access in distributed processes, so they are not recalculated in every task. - val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs) + //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs) + val rowIDDictionary = existingRowIDs match { + case Some(d) => d.merge(rowIDs) + case None => new BiDictionary(rowIDs) + } val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary) - val columnIDDictionary = asOrderedDictionary(entries = columnIDs) + val columnIDDictionary = new BiDictionary(keys = columnIDs) val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary) - val ncol = columnIDDictionary.size() - val nrow = rowIDDictionary.size() + val ncol = columnIDDictionary.size + val nrow = rowIDDictionary.size val indexedInteractions = interactions.map { case (rowID, columns) => - val rowIndex = rowIDDictionary_bcast.value.get(rowID).get + val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1) val elements = columns.split(elementDelim) val row = new RandomAccessSparseVector(ncol) for (element <- elements) { val id = element.split(columnIdStrengthDelim)(0) - val columnID = columnIDDictionary_bcast.value.get(id).get + val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1) val pair = element.split(columnIdStrengthDelim) if (pair.size == 2) // there was a strength row.setQuick(columnID, pair(1).toDouble) @@ -197,43 +199,30 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ .asInstanceOf[DrmRdd[Int]] // wrap the DrmRdd in a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed - //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol) val drmInteractions = drmWrap[Int](indexedInteractions) new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) } catch { case cce: ClassCastException => { - println(this.getClass.toString + ": Schema has illegal values") + logger.error("Schema has illegal values") throw cce } } } /** - * Creates a BiMap from an ID collection. The ID points to an ordinal in which is used internal to Mahout + * Creates a BiDictionary from an ID collection. The ID points to an ordinal in which is used internal to Mahout * as the row or column ID - * todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a + * todo: this is a non-distributed process in an otherwise distributed reader and the BiDictionary is a * non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit * in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs */ - private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), - entries: Array[String]): - BiMap[String, Int] = { - var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index' - for (entry <- entries) { - if (!dictionary.contains(entry)){ - dictionary.put(entry, index) - index += 1 - }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason - // they do - } - dictionary - } } /** Extends the Writer trait to supply the type being written and supplies the writer function */ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ + /** * Read in text delimited elements from all URIs in this comma delimited source String. * @param mc context for the Spark job @@ -241,11 +230,12 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ * @param dest directory to write text delimited version of [[IndexedDatasetSpark]] */ protected def writer( - mc: DistributedContext, - writeSchema: Schema, - dest: String, - indexedDataset: IndexedDatasetSpark, - sort: Boolean = true): Unit = { + mc: DistributedContext, + writeSchema: Schema, + dest: String, + indexedDataset: IndexedDatasetSpark, + sort: Boolean = true): Unit = { + @transient lazy val logger = Logger.getLogger(this.getClass.getCanonicalName) try { val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String] @@ -254,18 +244,15 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ //instance vars must be put into locally scoped vals when put into closures that are //executed but Spark - assert(indexedDataset != null, { - println(this.getClass.toString + ": has no indexedDataset to write") - throw new IllegalArgumentException - }) - assert(!dest.isEmpty, { - println(this.getClass.toString + ": has no destination or indextedDataset to write") - throw new IllegalArgumentException - }) + require (indexedDataset != null ,"No IndexedDataset to write") + require (!dest.isEmpty,"No destination to write to") - val matrix = indexedDataset.matrix + val matrix = indexedDataset.matrix.checkpoint() val rowIDDictionary = indexedDataset.rowIDs + val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary) + val columnIDDictionary = indexedDataset.columnIDs + val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary) matrix.rdd.map { case (rowID, itemVector) => @@ -279,23 +266,24 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ // first get the external rowID token if (!vector.isEmpty){ - var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim + var line = rowIDDictionary_bcast.value.inverse.getOrElse(rowID, "INVALID_ROW_ID") + rowKeyDelim // for the rest of the row, construct the vector contents of elements (external column ID, strength value) for (item <- vector) { - line += columnIDDictionary.inverse.get(item._1) + line += columnIDDictionary_bcast.value.inverse.getOrElse(item._1, "INVALID_COLUMN_ID") if (!omitScore) line += columnIdStrengthDelim + item._2 line += elementDelim } // drop the last delimiter, not needed to end the line line.dropRight(1) } else {//no items so write a line with id but no values, no delimiters - rowIDDictionary.inverse.get(rowID) + rowIDDictionary_bcast.value.inverse.getOrElse(rowID, "INVALID_ROW_ID") } // "if" returns a line of text so this must be last in the block } .saveAsTextFile(dest) }catch{ - case cce: ClassCastException => {println(this.getClass.toString+": Schema has illegal values"); throw cce} + case cce: ClassCastException => { + logger.error("Schema has illegal values"); throw cce} } } } http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 47eb40b..595cd66 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -17,10 +17,9 @@ package org.apache.mahout.sparkbindings -import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader import org.apache.mahout.math._ -import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema} +import org.apache.mahout.math.indexeddataset.{BiDictionary, DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema} import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scalabindings._ import RLikeOps._ @@ -259,7 +258,7 @@ object SparkEngine extends DistributedEngine { */ def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDatasetSpark = { val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) @@ -275,7 +274,7 @@ object SparkEngine extends DistributedEngine { */ def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetElementReadSchema, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + existingRowIDs: Option[BiDictionary] = None) (implicit sc: DistributedContext): IndexedDatasetSpark = { val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala index 30b32ad..727a95e 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala @@ -17,11 +17,15 @@ package org.apache.mahout.sparkbindings.indexeddataset -import com.google.common.collect.BiMap import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} -import org.apache.mahout.math.indexeddataset -import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, Reader, Schema, IndexedDataset} +import org.apache.mahout.math.{RandomAccessSparseVector, indexeddataset} +import org.apache.mahout.math.indexeddataset._ +import org.apache.mahout.sparkbindings._ +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + /** * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific @@ -30,20 +34,21 @@ import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs */ -class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], - val columnIDs: BiMap[String,Int]) +class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, + val columnIDs: BiDictionary) extends IndexedDataset { /** Secondary constructor enabling immutability */ def this(id2: IndexedDatasetSpark){ this(id2.matrix, id2.rowIDs, id2.columnIDs) } - + /** * Factory method used to create this extending class when the interface of * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */ - override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): + override def create(matrix: CheckpointedDrm[Int], rowIDs: BiDictionary, + columnIDs: BiDictionary): IndexedDatasetSpark = { new IndexedDatasetSpark(matrix, rowIDs, columnIDs) } @@ -60,3 +65,50 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[St } } +object IndexedDatasetSpark { + + def apply(elements: RDD[(String, String)], existingRowIDs: Option[BiDictionary] = None)(implicit sc: SparkContext) = { + + // create separate collections of rowID and columnID tokens + val rowIDs = elements.map { case (rowID, _) => rowID }.distinct().collect() + val columnIDs = elements.map { case (_, columnID) => columnID }.distinct().collect() + + // create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID + // broadcast them for access in distributed processes, so they are not recalculated in every task. + //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs) + val rowIDDictionary = existingRowIDs match { + case Some(d) => d.merge(rowIDs) + case None => new BiDictionary(rowIDs) + } + val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) + + val columnIDDictionary = new BiDictionary(keys = columnIDs) + val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary) + + val ncol = columnIDDictionary.size + val nrow = rowIDDictionary.size + + val indexedInteractions = + elements.map { case (rowID, columnID) => + val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1) + val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, -1) + + rowIndex -> columnIndex + } + // group by IDs to form row vectors + .groupByKey().map { case (rowIndex, columnIndexes) => + val row = new RandomAccessSparseVector(ncol) + for (columnIndex <- columnIndexes) { + row.setQuick(columnIndex, 1.0) + } + rowIndex -> row + }.asInstanceOf[DrmRdd[Int]] + + // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed + val drmInteractions = drmWrap[Int](indexedInteractions) + + new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala index a7a7df0..a8a0bb4 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala @@ -19,8 +19,8 @@ package org.apache.mahout.sparkbindings.io import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.serializers.JavaSerializer -import com.google.common.collect.HashBiMap import org.apache.mahout.math._ +import org.apache.mahout.math.indexeddataset.{BiMap, BiDictionary} import org.apache.spark.serializer.KryoRegistrator import org.apache.mahout.sparkbindings._ import org.apache.mahout.math.Vector.Element @@ -35,7 +35,5 @@ class MahoutKryoRegistrator extends KryoRegistrator { kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable]) kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable]) kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable]) - kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) - } } http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala index 4800a32..628d981 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala @@ -17,10 +17,9 @@ package org.apache.mahout.drivers -import com.google.common.collect.HashBiMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.mahout.math.indexeddataset.IndexedDataset +import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset} import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.scalatest.{ConfigMap, FunSuite} import org.apache.mahout.sparkbindings._ @@ -28,6 +27,8 @@ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings._ +import scala.collection.immutable.HashMap + //todo: take out, only for temp tests import org.apache.mahout.math.scalabindings._ @@ -653,7 +654,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { (1.0, 1.0)) val drmA = drmParallelize(m = a, numPartitions = 2) - val indexedDatasetA = new IndexedDatasetSpark(drmA, HashBiMap.create(), HashBiMap.create()) + val emptyIDs = new BiDictionary(new HashMap[String, Int]()) + val indexedDatasetA = new IndexedDatasetSpark(drmA, emptyIDs, emptyIDs) val biggerIDSA = indexedDatasetA.newRowCardinality(5) assert(biggerIDSA.matrix.nrow == 5) @@ -722,6 +724,95 @@ removed ==> u3 0 0 1 0 tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines } + test("ItemSimilarityDriver cross similarity two separate items spaces, adding rows in B") { + /* cross-similarity with category views, same user space + phones tablets mobile_acc soap + u1 0 1 1 0 + u2 1 1 1 0 +removed ==> u3 0 0 1 0 + u4 1 1 0 1 + */ + val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file + val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file + val OutPath = TmpDir + "similarity-matrices/" + + val lines = Array( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,phones", + "u1,view,mobile_acc", + "u2,view,phones", + "u2,view,tablets", + "u2,view,mobile_acc", + "u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work + "u4,view,phones", + "u4,view,tablets", + "u4,view,soap", + "u5,view,soap") + + val UnequalDimensionsSimilarityTokens = List( + "galaxy", + "nexus:2.231435513142097", + "iphone:0.13844293808390518", + "nexus", + "galaxy:2.231435513142097", + "ipad", + "iphone:2.231435513142097", + "surface", + "iphone", + "ipad:2.231435513142097", + "galaxy:0.13844293808390518") + + val UnequalDimensionsCrossSimilarityLines = List( + "galaxy", + "tablets:6.730116670092563", + "phones:2.9110316603236868", + "soap:0.13844293808390518", + "mobile_acc:0.13844293808390518", + "nexus", + "tablets:2.231435513142097", + "mobile_acc:1.184939225613002", + "phones:1.184939225613002", + "ipad", "mobile_acc:1.184939225613002", + "phones:1.184939225613002", + "surface", + "mobile_acc:1.184939225613002", + "iphone", + "phones:2.9110316603236868", + "soap:0.13844293808390518", + "tablets:0.13844293808390518", + "mobile_acc:0.13844293808390518") + + // this will create multiple part-xxxxx files in the InFile dir but other tests will + // take account of one actual file + val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1) + val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2) + + // local multi-threaded Spark with default HDFS + ItemSimilarityDriver.main(Array( + "--input", InFile1, + "--input2", InFile2, + "--output", OutPath, + "--master", masterUrl, + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDColumn", "2", + "--rowIDColumn", "0", + "--filterColumn", "1", + "--writeAllDatasets")) + + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs UnequalDimensionsSimilarityTokens + tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + } + // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable' def tokenize(a: Iterable[String]): Iterable[String] = { var r: Iterable[String] = Iterable()
