Repository: mahout
Updated Branches:
  refs/heads/master 244295a74 -> 72865b431


removed guava from scala and created an all-scala BiMap and BiDictionary, still 
need guava for mahout-math and mahout-hdfs but it's not used in broadcast so no 
error in general use. Also fixed a collect of interactions DRM, a nasty memory 
hog bug.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3d78096b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3d78096b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3d78096b

Branch: refs/heads/master
Commit: 3d78096bdb61b4ba20336f3fc51d42a5af77cc3c
Parents: 9abb526
Author: pferrel <[email protected]>
Authored: Wed May 20 11:16:15 2015 -0700
Committer: pferrel <[email protected]>
Committed: Wed May 20 11:16:15 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 bin/mahout                                      |   3 +-
 buildtools/pom.xml                              |   2 +-
 distribution/pom.xml                            |   4 +-
 examples/pom.xml                                |   2 +-
 h2o/pom.xml                                     |  28 ++--
 h2o/src/main/assembly/dependency-reduced.xml    |  44 +++++++
 .../apache/mahout/h2obindings/H2OEngine.scala   |   9 +-
 .../h2obindings/test/DistributedH2OSuite.scala  |   1 -
 hdfs/pom.xml                                    |   2 +-
 integration/pom.xml                             |   2 +-
 math-scala/pom.xml                              |  29 +---
 .../mahout/math/cf/SimilarityAnalysis.scala     |  16 ++-
 .../mahout/math/drm/DistributedEngine.scala     |   7 +-
 .../org/apache/mahout/math/drm/package.scala    |   5 +-
 .../mahout/math/indexeddataset/BiMap.scala      | 128 ++++++++++++++++++
 .../math/indexeddataset/IndexedDataset.scala    |  10 +-
 .../math/indexeddataset/ReaderWriter.scala      |  18 +--
 math/pom.xml                                    |   2 +-
 .../apache/mahout/math/SparseColumnMatrix.java  |   4 +-
 mr/pom.xml                                      |   2 +-
 pom.xml                                         |   8 +-
 spark-shell/pom.xml                             |  27 +++-
 spark/pom.xml                                   |   7 +-
 spark/src/main/assembly/dependency-reduced.xml  |   1 +
 .../mahout/drivers/ItemSimilarityDriver.scala   |  10 +-
 .../mahout/drivers/MahoutSparkDriver.scala      |   5 -
 .../drivers/TextDelimitedReaderWriter.scala     | 132 +++++++++----------
 .../mahout/sparkbindings/SparkEngine.scala      |   7 +-
 .../indexeddataset/IndexedDatasetSpark.scala    |  66 +++++++++-
 .../io/MahoutKryoRegistrator.scala              |   4 +-
 .../drivers/ItemSimilarityDriverSuite.scala     |  97 +++++++++++++-
 32 files changed, 493 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 1657afc..66cbf80 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.10.1 - unreleased
 
+  MAHOUT-1704: Pare down dependency jar for h2o (apalumbo)
+
   MAHOUT-1697: Fixed paths to which math-scala and spark modules docs get 
packaged under in bin distribution archive (sslavic)
 
   MAHOUT-1696: QRDecomposition.solve(...) can return incorrect Matrix types 
(apalumbo)

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index 772c184..ee0b918 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -186,9 +186,10 @@ then
        CLASSPATH=${CLASSPATH}:$f;
     done
 
-    for f in $MAHOUT_HOME/h2o/target/mahout-h2o-*.jar; do
+    for f in $MAHOUT_HOME/h2o/target/mahout-h2o*.jar; do
        CLASSPATH=${CLASSPATH}:$f;
     done
+
   fi
 
   # add jars for running from the command line if we requested shell or spark 
CLI driver

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/buildtools/pom.xml
----------------------------------------------------------------------
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 9510c5d..c1baa2f 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -29,7 +29,7 @@
 
   <groupId>org.apache.mahout</groupId>
   <artifactId>mahout-buildtools</artifactId>
-  <version>0.11.0-SNAPSHOT</version>
+  <version>0.10.1-SNAPSHOT</version>
   <name>Mahout Build Tools</name>
 
   <packaging>jar</packaging>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index bc17a08..156ab5d 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,10 +20,10 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
-  <artifactId>apache-mahout-distribution</artifactId>
+  <artifactId>mahout-distribution</artifactId>
   <name>Mahout Release Package</name>
   <description>Distribution Package</description>
   <packaging>pom</packaging>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index e22c6e1..d800df4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/pom.xml
----------------------------------------------------------------------
diff --git a/h2o/pom.xml b/h2o/pom.xml
index b9d101a..662b073 100644
--- a/h2o/pom.xml
+++ b/h2o/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -35,27 +35,26 @@
 
   <packaging>jar</packaging>
 
+
+
   <build>
-    <plugins>
 
+    <plugins>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-          <archive>
-            <manifest>
-              <mainClass>water.H2O</mainClass>
-            </manifest>
-          </archive>
-        </configuration>
         <executions>
           <execution>
+            <id>dependency-reduced</id>
             <phase>package</phase>
             <goals>
               <goal>single</goal>
             </goals>
+            <configuration>
+              <descriptors>
+                
<descriptor>src/main/assembly/dependency-reduced.xml</descriptor>
+              </descriptors>
+            </configuration>
           </execution>
         </executions>
       </plugin>
@@ -124,6 +123,11 @@
   </build>
 
   <dependencies>
+  <dependency>
+    <groupId>org.scala-lang</groupId>
+    <artifactId>scala-library</artifactId>
+    <version>${scala.version}</version>
+  </dependency>
 
     <dependency>
       <groupId>org.apache.mahout</groupId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/h2o/src/main/assembly/dependency-reduced.xml 
b/h2o/src/main/assembly/dependency-reduced.xml
new file mode 100644
index 0000000..0636f1d
--- /dev/null
+++ b/h2o/src/main/assembly/dependency-reduced.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+  http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+  <id>dependency-reduced</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <unpack>true</unpack>
+      <unpackOptions>
+      <!-- MAHOUT-1126 -->
+      <excludes>
+         <exclude>META-INF/LICENSE</exclude>
+      </excludes>
+      </unpackOptions>
+      <scope>runtime</scope>
+      <outputDirectory>/</outputDirectory>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <includes>
+        <include>ai.h2o:h2o-core</include>
+        <include>org.scala-lang:scala-library</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 4c34f31..173d5a0 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -17,8 +17,7 @@
 
 package org.apache.mahout.h2obindings
 
-import com.google.common.collect.{HashBiMap, BiMap}
-import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, 
DefaultIndexedDatasetReadSchema}
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, 
Schema, DefaultIndexedDatasetReadSchema}
 
 import scala.reflect._
 import org.apache.mahout.math._
@@ -117,7 +116,7 @@ object H2OEngine extends DistributedEngine {
   implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): 
CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
 
   /** stub class not implemented in H2O */
-  abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val 
rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+  abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val 
rowIDs: BiDictionary, val columnIDs: BiDictionary)
     extends IndexedDataset {}
 
     /**
@@ -129,7 +128,7 @@ object H2OEngine extends DistributedEngine {
    */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDatasetH2O = {
     // should log a warning when this is built but no logger here, can an H2O 
contributor help with this
@@ -147,7 +146,7 @@ object H2OEngine extends DistributedEngine {
    */
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDatasetH2O = {
     // should log a warning when this is built but no logger here, can an H2O 
contributor help with this

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala
----------------------------------------------------------------------
diff --git 
a/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala
 
b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala
index abb4289..26182b4 100644
--- 
a/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala
+++ 
b/h2o/src/test/scala/org/apache/mahout/h2obindings/test/DistributedH2OSuite.scala
@@ -29,7 +29,6 @@ trait DistributedH2OSuite extends DistributedMahoutSuite with 
LoggerConfiguratio
 
   override protected def beforeEach() {
     super.beforeEach()
-
     mahoutCtx = mahoutH2OContext("mah2out" + System.currentTimeMillis())
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hdfs/pom.xml b/hdfs/pom.xml
index 30d47c3..0498cce 100644
--- a/hdfs/pom.xml
+++ b/hdfs/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index a68e7f1..e949aaf 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 78331dd..84e59af 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -136,35 +136,12 @@
 
     <!-- scala stuff -->
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-compiler</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-reflect</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-actors</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scalap</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.compat.version}</artifactId>
     </dependency>
 
+
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index 6557ab0..fd91c16 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -68,7 +68,7 @@ object SimilarityAnalysis extends Serializable {
     // Compute & broadcast the number of interactions per thing in A
     val bcastInteractionsPerItemA = 
drmBroadcast(drmA.numNonZeroElementsPerColumn)
 
-    // Compute co-occurrence matrix A'A
+    // Compute cooccurrence matrix A'A
     val drmAtA = drmA.t %*% drmA
 
     // Compute loglikelihood scores and sparsify the resulting matrix to get 
the similarity matrix
@@ -77,7 +77,7 @@ object SimilarityAnalysis extends Serializable {
 
     var similarityMatrices = List(drmSimilarityAtA)
 
-    // Now look at cross-co-occurrences
+    // Now look at cross cooccurrences
     for (drmBRaw <- drmBs) {
       // Down-sample and pin other interaction matrix
       val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, 
maxNumInteractions).checkpoint()
@@ -85,7 +85,7 @@ object SimilarityAnalysis extends Serializable {
       // Compute & broadcast the number of interactions per thing in B
       val bcastInteractionsPerThingB = 
drmBroadcast(drmB.numNonZeroElementsPerColumn)
 
-      // Compute cross-co-occurrence matrix A'B
+      // Compute cross-cooccurrence matrix A'B
       val drmAtB = drmA.t %*% drmB
 
       val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, 
maxInterestingItemsPerThing,
@@ -94,11 +94,21 @@ object SimilarityAnalysis extends Serializable {
       similarityMatrices = similarityMatrices :+ drmSimilarityAtB
 
       drmB.uncache()
+
+      //debug
+      val atbRows = drmSimilarityAtB.nrow
+      val atbCols = drmSimilarityAtB.ncol
+      val i = 0
     }
 
     // Unpin downsampled interaction matrix
     drmA.uncache()
 
+    //debug
+    val ataRows = drmSimilarityAtA.nrow
+    val ataCols = drmSimilarityAtA.ncol
+    val i = 0
+
     // Return list of similarity matrices
     similarityMatrices
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index dd5b101..bb6772a 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -17,8 +17,7 @@
 
 package org.apache.mahout.math.drm
 
-import com.google.common.collect.{HashBiMap, BiMap}
-import 
org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, 
DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+import org.apache.mahout.math.indexeddataset._
 
 import scala.reflect.ClassTag
 import logical._
@@ -95,7 +94,7 @@ trait DistributedEngine {
    */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDataset
 
@@ -106,7 +105,7 @@ trait DistributedEngine {
    */
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetElementReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDataset
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 81f6ab1..1fae831 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -17,7 +17,6 @@
 
 package org.apache.mahout.math
 
-import com.google.common.collect.{HashBiMap, BiMap}
 import org.apache.mahout.math.drm.DistributedContext
 import org.apache.mahout.math.indexeddataset.{IndexedDataset, 
DefaultIndexedDatasetReadSchema, Schema}
 import org.apache.mahout.math.scalabindings.RLikeOps._
@@ -122,13 +121,13 @@ package object indexeddataset {
   /** Load IndexedDataset from text delimited files */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
     (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
 
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
     (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, 
existingRowIDs)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala
new file mode 100644
index 0000000..6c0d432
--- /dev/null
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/BiMap.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.indexeddataset
+
+import scala.collection.immutable.HashMap
+
+/**
+ * Immutable Bi-directional Map.
+ * @param m Map to use for forward reference
+ * @param i optional reverse map of value to key, will create one lazily if 
none is provided
+ *          and is required to have no duplicate reverse mappings.
+ */
+class BiMap[K, V] (
+    private val m: Map[K, V],
+    // if this is serialized we allow i to be discarded and recalculated when 
deserialized
+    @transient private var i: Option[BiMap[V, K]] = None
+  ) extends Serializable {
+
+  // NOTE: make inverse's inverse point back to current BiMap
+  // if this is serialized we allow inverse to be discarded and recalculated 
when deserialized
+  @transient lazy val inverse: BiMap[V, K] = {
+    if( i == null.asInstanceOf[Option[BiMap[V, K]]] )
+      i = None
+    i.getOrElse {
+      val rev = m.map(_.swap)
+      require((rev.size == m.size), "Failed to create reversed map. Cannot 
have duplicated values.")
+      new BiMap(rev, Some(this))
+    }
+  }
+
+  // forces inverse to be calculated in the constructor when deserialized
+  // not when first used
+  @transient val size_ = inverse.size
+
+  def get(k: K): Option[V] = m.get(k)
+
+  def getOrElse(k: K, default: => V): V = m.getOrElse(k, default)
+
+  def contains(k: K): Boolean = m.contains(k)
+
+  def apply(k: K): V = m.apply(k)
+
+  /**
+   * Converts to a map.
+   * @return a map of type immutable.Map[K, V]
+   */
+  def toMap: Map[K, V] = m
+
+  /**
+   * Converts to a sequence.
+   * @return a sequence containing all elements of this map
+   */
+  def toSeq: Seq[(K, V)] = m.toSeq
+
+  def size: Int = m.size
+
+  def take(n: Int) = BiMap(m.take(n))
+
+  override def toString = m.toString
+}
+
+object BiMap {
+
+  /** Extra constructor from a map */
+  def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x)
+
+}
+
+/** BiDictionary is a specialized BiMap that has non-negative Ints as values 
for use as DRM keys */
+class BiDictionary (
+    private val m: Map[String, Int],
+    @transient private val i: Option[BiMap[Int, String]] = None )
+  extends BiMap[String, Int](m, i) {
+
+  /**
+   * Create a new BiDictionary with the keys supplied and values ranging from 
0 to size -1
+   * @param keys a set of String
+   */
+  def this(keys: Seq[String]) = {
+    this(HashMap(keys.view.zipWithIndex: _*))
+  }
+
+  def merge(
+    keys: Seq[String]): BiDictionary = {
+
+    var newIDs = List[String]()
+
+    for (key <- keys) {
+      if (!m.contains(key)) newIDs = key +: newIDs
+    }
+    if(newIDs.isEmpty) this else new BiDictionary(m ++ HashMap(newIDs.view.zip 
(Stream from size): _*))
+
+  }
+
+}
+
+/** BiDictionary is a specialized BiMap that has non-negative Ints as values 
for use as DRM keys.
+  * The companion object provides modification methods specific to maintaining 
contiguous Int values
+  * and unique String keys */
+object BiDictionary {
+
+  /**
+   * Append new keys to an existing BiDictionary and return the result. The 
values will start
+   * at m.size and increase to create a continuous non-zero value set from 0 
to size - 1
+   * @param keys new keys to append, not checked for uniqueness so may be 
dangerous
+   * @param biDi merge keys to this BiDictionary and create new values buy 
incremeting from the highest Int value
+   * @return a BiDictionary with added mappings
+   */
+  /*def append(keys: Seq[String], biDi: BiDictionary): BiDictionary = {
+    val hm = HashMap(keys.view.zip (Stream from biDi.size): _*)
+    new BiDictionary(biDi.m ++ hm)
+  }*/
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
index f6811e2..eeca736 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -17,12 +17,10 @@
 
 package org.apache.mahout.math.indexeddataset
 
-import com.google.common.collect.BiMap
 import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
-import org.apache.mahout.math.indexeddataset
 
 /**
- * Wrap an  [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID 
mappings [[com.google.common.collect.BiMap]]
+ * Wrap an  [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID 
mappings [[org.apache.mahout.math.indexeddataset.BiDictionary]]
  * so a user specified labels/IDs can be stored and mapped to and from the 
Mahout Int ID used internal to Mahout
  * core code.
  * @todo Often no need for both or perhaps either dictionary, so save 
resources by allowing to be not created
@@ -31,8 +29,8 @@ import org.apache.mahout.math.indexeddataset
 
 trait IndexedDataset {
   val matrix: CheckpointedDrm[Int]
-  val rowIDs: BiMap[String,Int]
-  val columnIDs: BiMap[String,Int]
+  val rowIDs: BiDictionary
+  val columnIDs: BiDictionary
 
   /**
    * Write a text delimited file(s) with the row and column IDs from 
dictionaries.
@@ -43,7 +41,7 @@ trait IndexedDataset {
   def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): 
Unit
 
   /** Factory method, creates the extending class  and returns a new instance 
*/
-  def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], 
columnIDs: BiMap[String,Int]):
+  def create(matrix: CheckpointedDrm[Int], rowIDs: BiDictionary, columnIDs: 
BiDictionary):
     IndexedDataset
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
index f7653ae..65c0d8f 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -17,8 +17,8 @@
 
 package org.apache.mahout.math.indexeddataset
 
-import com.google.common.collect.{BiMap, HashBiMap}
 import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset
 
 /**
  * Reader trait is abstract in the sense that the elementReader and rowReader 
functions must be supplied by an
@@ -35,7 +35,7 @@ trait Reader[T]{
    * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
    * @param readSchema map of parameters controlling formating and how the 
read is executed
    * @param source list of comma delimited files to read from
-   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row 
IDs that have already
+   * @param existingRowIDs [[indexeddataset.BiDictionary]] containing row IDs 
that have already
    *                       been applied to this collection--used to 
synchronize row IDs between several
    *                       collections
    * @return a new collection of type T
@@ -44,14 +44,14 @@ trait Reader[T]{
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int]): T
+      existingRowIDs: Option[BiDictionary] = None): T
 
   /**
    * Override in extending trait to supply T and perform a parallel read of 
collection rows
    * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
    * @param readSchema map of parameters controlling formating and how the 
read is executed
    * @param source list of comma delimited files to read from
-   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row 
IDs that have already
+   * @param existingRowIDs [[indexeddataset.BiDictionary]] containing row IDs 
that have already
    *                       been applied to this collection--used to 
synchronize row IDs between several
    *                       collections
    * @return a new collection of type T
@@ -60,30 +60,30 @@ trait Reader[T]{
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int]): T
+      existingRowIDs: Option[BiDictionary] = None): T
 
   /**
    * Public method called to perform the element-wise read. Usually no need to 
override
    * @param source comma delimited URIs to read from
-   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing 
previously used id mappings--used
+   * @param existingRowIDs a [[indexeddataset.BiDictionary]] containing 
previously used id mappings--used
    *                       to synchronize all row ids is several collections
    * @return a new collection of type T
    */
   def readElementsFrom(
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+      existingRowIDs: Option[BiDictionary] = None): T =
     elementReader(mc, readSchema, source, existingRowIDs)
 
   /**
    * Public method called to perform the row-wise read. Usually no need to 
override.
    * @param source comma delimited URIs to read from
-   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing 
previously used id mappings--used
+   * @param existingRowIDs a [[indexeddataset.BiDictionary]] containing 
previously used id mappings--used
    *                       to synchronize all row ids is several collections
    * @return  a new collection of type T
    */
   def readRowsFrom(
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+      existingRowIDs: Option[BiDictionary] = None): T =
     rowReader(mc, readSchema, source, existingRowIDs)
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math/pom.xml
----------------------------------------------------------------------
diff --git a/math/pom.xml b/math/pom.xml
index 59a3f0f..b41bcd3 100644
--- a/math/pom.xml
+++ b/math/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java
----------------------------------------------------------------------
diff --git a/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java 
b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java
index d847dea..f62d553 100644
--- a/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java
+++ b/math/src/main/java/org/apache/mahout/math/SparseColumnMatrix.java
@@ -184,9 +184,9 @@ public class SparseColumnMatrix extends AbstractMatrix {
     StringBuilder s = new StringBuilder("{\n");
     for (MatrixSlice next : this.transpose()) {
       if (row < maxRowsToDisplay) {
-        s.append("  ")
+        s.append(" ")
           .append(next.index())
-          .append("  =>\t")
+          .append(" =>\t")
           .append(new VectorView(next.vector(), 0, colsToDisplay))
           .append('\n');
         row++;

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/mr/pom.xml
----------------------------------------------------------------------
diff --git a/mr/pom.xml b/mr/pom.xml
index 3f5afa3..60e52ac 100644
--- a/mr/pom.xml
+++ b/mr/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 74da44e..61a5b57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
   <groupId>org.apache.mahout</groupId>
   <artifactId>mahout</artifactId>
-  <version>0.11.0-SNAPSHOT</version>
+  <version>0.10.1-SNAPSHOT</version>
 
   <packaging>pom</packaging>
   <name>Apache Mahout</name>
@@ -100,8 +100,8 @@
   <scm>
     <connection>scm:git:[email protected]:apache/mahout.git</connection>
     
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/mahout.git</developerConnection>
-    <url>scm:git:[email protected]:apache/mahout.git</url>
-    <tag>HEAD</tag>
+    
<url>https://git-wip-us.apache.org/repos/asf?p=maven.git;a=tree;h=refs/heads/${project.scm.tag};hb=${project.scm.tag}</url>
+    <tag>mahout-0.10.x</tag>
   </scm>
   <properties>
     <skipTests>false</skipTests>
@@ -120,7 +120,7 @@
     <slf4j.version>1.7.10</slf4j.version>
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
-    <spark.version>1.1.1</spark.version>
+    <spark.version>1.2.2</spark.version>
     <h2o.version>0.1.25</h2o.version>
   </properties>
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark-shell/pom.xml
----------------------------------------------------------------------
diff --git a/spark-shell/pom.xml b/spark-shell/pom.xml
index 0903534..b5f283d 100644
--- a/spark-shell/pom.xml
+++ b/spark-shell/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -134,6 +134,31 @@
 
     <!-- scala stuff -->
     <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-actors</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scalap</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.compat.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 5646c25..ce42c44 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.mahout</groupId>
     <artifactId>mahout</artifactId>
-    <version>0.11.0-SNAPSHOT</version>
+    <version>0.10.1-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -156,11 +156,6 @@
     </dependency>
 
     <!--  3rd-party -->
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>14.0.1</version>
-    </dependency>
 
     <!-- scala stuff -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/dependency-reduced.xml 
b/spark/src/main/assembly/dependency-reduced.xml
index 3c52d35..6f1e4c2 100644
--- a/spark/src/main/assembly/dependency-reduced.xml
+++ b/spark/src/main/assembly/dependency-reduced.xml
@@ -36,6 +36,7 @@
       <outputDirectory>/</outputDirectory>
       <useTransitiveFiltering>true</useTransitiveFiltering>
       <includes>
+        <!-- guava only included to get Preconditions in mahout-math and 
mahout-hdfs -->
         <include>com.google.guava:guava</include>
         <include>com.github.scopt</include>
         <include>com.tdunning:t-digest</include>

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 34e8cf9..d7f2787 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -162,7 +162,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
       // be supported (and are at least on Spark) or the row cardinality 
adjustment will not work.
       val datasetB = if (!inFiles2.isEmpty) {
         // get cross-cooccurrence interactions from separate files
-        val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, 
existingRowIDs = datasetA.rowIDs)
+        val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, 
existingRowIDs = Some(datasetA.rowIDs))
 
         datasetB
 
@@ -170,7 +170,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
         && parser.opts("filter2").asInstanceOf[String] != null) {
 
         // get cross-cooccurrences interactions by using two filters on a 
single set of files
-        val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, 
existingRowIDs = datasetA.rowIDs)
+        val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, 
existingRowIDs = Some(datasetA.rowIDs))
 
         datasetB
 
@@ -178,11 +178,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
         null.asInstanceOf[IndexedDatasetSpark]
       }
       if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
-      // true row cardinality is the size of the row id index, which was 
calculated from all rows of A and B
-      val rowCardinality = datasetB.rowIDs.size() // the authoritative row 
cardinality
+        // true row cardinality is the size of the row id index, which was 
calculated from all rows of A and B
+        val rowCardinality = datasetB.rowIDs.size // the authoritative row 
cardinality
 
-        // todo: how expensive is nrow? We could make assumptions about 
.rowIds that don't rely on
-        // its calculation
         val returnedA = if (rowCardinality != datasetA.matrix.nrow) 
datasetA.newRowCardinality(rowCardinality)
         else datasetA // this guarantees matching cardinality
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index 668d70c..40ffab3 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -74,11 +74,6 @@ abstract class MahoutSparkDriver extends MahoutDriver {
    */
   override protected def start() : Unit = {
     if (!_useExistingContext) {
-      /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap 
throwing ClassNotFound--doesn't seem to work
-      sparkConf.set("spark.files.userClassPathFirst", "true")
-      sparkConf.set("spark.executor.userClassPathFirst", "true")
-      */
-
       sparkConf.set("spark.kryo.referenceTracking", "false")
         .set("spark.kryoserializer.buffer.mb", "200")// this is default for 
Mahout optimizer, change it with -D option
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 6c7992a..e2a2a9a 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,11 +17,11 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, 
IndexedDataset}
+import org.apache.log4j.Logger
+import org.apache.mahout.math.indexeddataset._
 import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
-import com.google.common.collect.{BiMap, HashBiMap}
 import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, 
CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
@@ -42,10 +42,11 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
    * @return a new 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
    */
   protected def elementReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): 
IndexedDatasetSpark = {
+    mc: DistributedContext,
+    readSchema: Schema,
+    source: String,
+    existingRowIDs: Option[BiDictionary] = None): IndexedDatasetSpark = {
+    @transient lazy val logger = 
Logger.getLogger(this.getClass.getCanonicalName)
     try {
       val delimiter = readSchema("delim").asInstanceOf[String]
       val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int]
@@ -54,10 +55,8 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       val filterBy = readSchema("filter").asInstanceOf[String]
       // instance vars must be put into locally scoped vals when used in 
closures that are executed but Spark
 
-      assert(!source.isEmpty, {
-        println(this.getClass.toString + ": has no files to read")
-        throw new IllegalArgumentException
-      })
+
+      require (!source.isEmpty, "No file(s) to read")
 
       var columns = mc.textFile(source).map { line => line.split(delimiter) }
 
@@ -68,7 +67,6 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       }
 
       // get row and column IDs
-      //val m = columns.collect
       val interactions = columns.map { tokens =>
         tokens(rowIDColumn) -> tokens(columnIDPosition)
       }
@@ -79,21 +77,25 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       val rowIDs = interactions.map { case (rowID, _) => rowID 
}.distinct().collect()
       val columnIDs = interactions.map { case (_, columnID) => columnID 
}.distinct().collect()
 
-      // create BiMaps for bi-directional lookup of ID by either Mahout ID or 
external ID
+      // create BiDictionary(s) for bi-directional lookup of ID by either 
Mahout ID or external ID
       // broadcast them for access in distributed processes, so they are not 
recalculated in every task.
-      val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
+      //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
+      val rowIDDictionary = existingRowIDs match {
+        case Some(d) => d.merge(rowIDs)
+        case None =>  new BiDictionary(rowIDs)
+      }
       val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
 
-      val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
+      val columnIDDictionary = new BiDictionary(keys = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
-      val ncol = columnIDDictionary.size()
-      val nrow = rowIDDictionary.size()
+      val ncol = columnIDDictionary.size
+      val nrow = rowIDDictionary.size
 
       val indexedInteractions =
         interactions.map { case (rowID, columnID) =>
-          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
-          val columnIndex = columnIDDictionary_bcast.value.get(columnID).get
+          val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
+          val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, 
-1)
 
           rowIndex -> columnIndex
         }
@@ -108,14 +110,13 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
         .asInstanceOf[DrmRdd[Int]]
 
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a 
DrmLike[Int] is needed
-      //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
       val drmInteractions = drmWrap[Int](indexedInteractions)
 
       new IndexedDatasetSpark(drmInteractions, rowIDDictionary, 
columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
-        println(this.getClass.toString + ": Schema has illegal values"); throw 
cce
+        logger.error("Schema has illegal values"); throw cce
       }
     }
   }
@@ -130,21 +131,17 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
    * @return a new 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
    */
   protected def rowReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): 
IndexedDatasetSpark = {
+    mc: DistributedContext,
+    readSchema: Schema,
+    source: String,
+    existingRowIDs: Option[BiDictionary] = None): IndexedDatasetSpark = {
+    @transient lazy val logger = 
Logger.getLogger(this.getClass.getCanonicalName)
     try {
       val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = 
readSchema("columnIdStrengthDelim").asInstanceOf[String]
       val elementDelim = readSchema("elementDelim").asInstanceOf[String]
-      // no need for omitScore since we can tell if there is a score and 
assume it is 1.0d if not specified
-      //val omitScore = readSchema("omitScore").asInstanceOf[Boolean]
 
-      assert(!source.isEmpty, {
-        println(this.getClass.toString + ": has no files to read")
-        throw new IllegalArgumentException
-      })
+      require (!source.isEmpty, "No file(s) to read")
 
       var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) }
 
@@ -154,7 +151,8 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       }
 
       interactions.cache()
-      interactions.collect()
+      // forces into memory so only for debugging
+      //interactions.collect()
 
       // create separate collections of rowID and columnID tokens
       val rowIDs = interactions.map { case (rowID, _) => rowID 
}.distinct().collect()
@@ -168,24 +166,28 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
 
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or 
external ID
       // broadcast them for access in distributed processes, so they are not 
recalculated in every task.
-      val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
+      //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
+      val rowIDDictionary = existingRowIDs match {
+        case Some(d) => d.merge(rowIDs)
+        case None =>  new BiDictionary(rowIDs)
+      }
       val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
 
-      val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
+      val columnIDDictionary = new BiDictionary(keys = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
-      val ncol = columnIDDictionary.size()
-      val nrow = rowIDDictionary.size()
+      val ncol = columnIDDictionary.size
+      val nrow = rowIDDictionary.size
 
       val indexedInteractions =
         interactions.map { case (rowID, columns) =>
-          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+          val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
 
           val elements = columns.split(elementDelim)
           val row = new RandomAccessSparseVector(ncol)
           for (element <- elements) {
             val id = element.split(columnIdStrengthDelim)(0)
-            val columnID = columnIDDictionary_bcast.value.get(id).get
+            val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1)
             val pair = element.split(columnIdStrengthDelim)
             if (pair.size == 2) // there was a strength
               row.setQuick(columnID, pair(1).toDouble)
@@ -197,43 +199,30 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
         .asInstanceOf[DrmRdd[Int]]
 
       // wrap the DrmRdd in a CheckpointedDrm, which can be used anywhere a 
DrmLike[Int] is needed
-      //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
       val drmInteractions = drmWrap[Int](indexedInteractions)
 
       new IndexedDatasetSpark(drmInteractions, rowIDDictionary, 
columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
-        println(this.getClass.toString + ": Schema has illegal values")
+        logger.error("Schema has illegal values")
         throw cce
       }
     }
   }
 
   /**
-   * Creates a BiMap from an ID collection. The ID points to an ordinal in 
which is used internal to Mahout
+   * Creates a BiDictionary from an ID collection. The ID points to an ordinal 
in which is used internal to Mahout
    * as the row or column ID
-   * todo: this is a non-distributed process in an otherwise distributed 
reader and the BiMap is a
+   * todo: this is a non-distributed process in an otherwise distributed 
reader and the BiDictionary is a
    * non-rdd based object--this will limit the size of the dataset to ones 
where the dictionaries fit
    * in-memory, the option is to put the dictionaries in rdds and do joins to 
translate IDs
    */
-  private def asOrderedDictionary(dictionary: BiMap[String, Int] = 
HashBiMap.create(),
-      entries: Array[String]):
-    BiMap[String, Int] = {
-    var index = dictionary.size() // if a dictionary is supplied then add to 
the end based on the Mahout id 'index'
-    for (entry <- entries) {
-      if (!dictionary.contains(entry)){
-        dictionary.put(entry, index)
-        index += 1
-      }// the dictionary should never contain an entry since they are supposed 
to be distinct but for some reason
-      // they do
-    }
-    dictionary
-  }
 }
 
 /** Extends the Writer trait to supply the type being written and supplies the 
writer function */
 trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
+
   /**
    * Read in text delimited elements from all URIs in this comma delimited 
source String.
    * @param mc context for the Spark job
@@ -241,11 +230,12 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
    * @param dest directory to write text delimited version of 
[[IndexedDatasetSpark]]
    */
   protected def writer(
-      mc: DistributedContext,
-      writeSchema: Schema,
-      dest: String,
-      indexedDataset: IndexedDatasetSpark,
-      sort: Boolean = true): Unit = {
+    mc: DistributedContext,
+    writeSchema: Schema,
+    dest: String,
+    indexedDataset: IndexedDatasetSpark,
+    sort: Boolean = true): Unit = {
+    @transient lazy val logger = 
Logger.getLogger(this.getClass.getCanonicalName)
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = 
writeSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -254,18 +244,15 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
       //instance vars must be put into locally scoped vals when put into 
closures that are
       //executed but Spark
 
-      assert(indexedDataset != null, {
-        println(this.getClass.toString + ": has no indexedDataset to write")
-        throw new IllegalArgumentException
-      })
-      assert(!dest.isEmpty, {
-        println(this.getClass.toString + ": has no destination or 
indextedDataset to write")
-        throw new IllegalArgumentException
-      })
+      require (indexedDataset != null ,"No IndexedDataset to write")
+      require (!dest.isEmpty,"No destination to write to")
 
-      val matrix = indexedDataset.matrix
+      val matrix = indexedDataset.matrix.checkpoint()
       val rowIDDictionary = indexedDataset.rowIDs
+      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
       val columnIDDictionary = indexedDataset.columnIDs
+      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
       matrix.rdd.map { case (rowID, itemVector) =>
 
@@ -279,23 +266,24 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
 
         // first get the external rowID token
         if (!vector.isEmpty){
-          var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+          var line = rowIDDictionary_bcast.value.inverse.getOrElse(rowID, 
"INVALID_ROW_ID") + rowKeyDelim
           // for the rest of the row, construct the vector contents of 
elements (external column ID, strength value)
           for (item <- vector) {
-            line += columnIDDictionary.inverse.get(item._1)
+            line += columnIDDictionary_bcast.value.inverse.getOrElse(item._1, 
"INVALID_COLUMN_ID")
             if (!omitScore) line += columnIdStrengthDelim + item._2
             line += elementDelim
           }
           // drop the last delimiter, not needed to end the line
           line.dropRight(1)
         } else {//no items so write a line with id but no values, no delimiters
-          rowIDDictionary.inverse.get(rowID)
+          rowIDDictionary_bcast.value.inverse.getOrElse(rowID, 
"INVALID_ROW_ID")
         } // "if" returns a line of text so this must be last in the block
       }
       .saveAsTextFile(dest)
 
     }catch{
-      case cce: ClassCastException => {println(this.getClass.toString+": 
Schema has illegal values"); throw cce}
+      case cce: ClassCastException => {
+        logger.error("Schema has illegal values"); throw cce}
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 47eb40b..595cd66 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,10 +17,9 @@
 
 package org.apache.mahout.sparkbindings
 
-import com.google.common.collect.{BiMap, HashBiMap}
 import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
 import org.apache.mahout.math._
-import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, 
Schema, DefaultIndexedDatasetElementReadSchema}
+import org.apache.mahout.math.indexeddataset.{BiDictionary, 
DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema}
 import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
@@ -259,7 +258,7 @@ object SparkEngine extends DistributedEngine {
    */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
@@ -275,7 +274,7 @@ object SparkEngine extends DistributedEngine {
    */
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetElementReadSchema,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      existingRowIDs: Option[BiDictionary] = None)
       (implicit sc: DistributedContext):
     IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index 30b32ad..727a95e 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -17,11 +17,15 @@
 
 package org.apache.mahout.sparkbindings.indexeddataset
 
-import com.google.common.collect.BiMap
 import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter
 import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
-import org.apache.mahout.math.indexeddataset
-import 
org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, 
Reader, Schema, IndexedDataset}
+import org.apache.mahout.math.{RandomAccessSparseVector, indexeddataset}
+import org.apache.mahout.math.indexeddataset._
+import org.apache.mahout.sparkbindings._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+
 
 /**
  * Spark implementation of 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark 
specific
@@ -30,20 +34,21 @@ import 
org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema,
  * @param rowIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
  * @param columnIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
  */
-class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: 
BiMap[String,Int],
-    val columnIDs: BiMap[String,Int])
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: 
BiDictionary,
+    val columnIDs: BiDictionary)
   extends IndexedDataset {
 
   /** Secondary constructor enabling immutability */
   def this(id2: IndexedDatasetSpark){
     this(id2.matrix, id2.rowIDs, id2.columnIDs)
   }
-
+  
   /**
    * Factory method used to create this extending class when the interface of
    * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is 
known.
    */
-  override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], 
columnIDs: BiMap[String,Int]):
+  override def create(matrix: CheckpointedDrm[Int], rowIDs: BiDictionary,
+      columnIDs: BiDictionary):
     IndexedDatasetSpark = {
     new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
   }
@@ -60,3 +65,50 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], 
val rowIDs: BiMap[St
   }
 }
 
+object IndexedDatasetSpark {
+  
+  def apply(elements: RDD[(String, String)], existingRowIDs: 
Option[BiDictionary] = None)(implicit sc: SparkContext) = {
+
+    // create separate collections of rowID and columnID tokens
+    val rowIDs = elements.map { case (rowID, _) => rowID }.distinct().collect()
+    val columnIDs = elements.map { case (_, columnID) => columnID 
}.distinct().collect()
+
+    // create BiDictionary(s) for bi-directional lookup of ID by either Mahout 
ID or external ID
+    // broadcast them for access in distributed processes, so they are not 
recalculated in every task.
+    //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
+    val rowIDDictionary = existingRowIDs match {
+      case Some(d) => d.merge(rowIDs)
+      case None =>  new BiDictionary(rowIDs)
+    }
+    val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
+
+    val columnIDDictionary = new BiDictionary(keys = columnIDs)
+    val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary)
+
+    val ncol = columnIDDictionary.size
+    val nrow = rowIDDictionary.size
+
+    val indexedInteractions =
+      elements.map { case (rowID, columnID) =>
+        val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
+        val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, 
-1)
+
+        rowIndex -> columnIndex
+      }
+        // group by IDs to form row vectors
+        .groupByKey().map { case (rowIndex, columnIndexes) =>
+        val row = new RandomAccessSparseVector(ncol)
+        for (columnIndex <- columnIndexes) {
+          row.setQuick(columnIndex, 1.0)
+        }
+        rowIndex -> row
+      }.asInstanceOf[DrmRdd[Int]]
+
+    // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a 
DrmLike[Int] is needed
+    val drmInteractions = drmWrap[Int](indexedInteractions)
+
+    new IndexedDatasetSpark(drmInteractions, rowIDDictionary, 
columnIDDictionary)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index a7a7df0..a8a0bb4 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -19,8 +19,8 @@ package org.apache.mahout.sparkbindings.io
 
 import com.esotericsoftware.kryo.Kryo
 import com.esotericsoftware.kryo.serializers.JavaSerializer
-import com.google.common.collect.HashBiMap
 import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.{BiMap, BiDictionary}
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.math.Vector.Element
@@ -35,7 +35,5 @@ class MahoutKryoRegistrator extends KryoRegistrator {
     kryo.addDefaultSerializer(classOf[Vector], new 
WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[DenseVector], new 
WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[Matrix], new 
WritableKryoSerializer[Matrix, MatrixWritable])
-    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
-
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/3d78096b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 4800a32..628d981 100644
--- 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -17,10 +17,9 @@
 
 package org.apache.mahout.drivers
 
-import com.google.common.collect.HashBiMap
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.mahout.math.indexeddataset.IndexedDataset
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset}
 import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.scalatest.{ConfigMap, FunSuite}
 import org.apache.mahout.sparkbindings._
@@ -28,6 +27,8 @@ import 
org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings._
 
+import scala.collection.immutable.HashMap
+
 //todo: take out, only for temp tests
 
 import org.apache.mahout.math.scalabindings._
@@ -653,7 +654,8 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       (1.0, 1.0))
 
     val drmA = drmParallelize(m = a, numPartitions = 2)
-    val indexedDatasetA = new IndexedDatasetSpark(drmA, HashBiMap.create(), 
HashBiMap.create())
+    val emptyIDs = new BiDictionary(new HashMap[String, Int]())
+    val indexedDatasetA = new IndexedDatasetSpark(drmA, emptyIDs, emptyIDs)
     val biggerIDSA = indexedDatasetA.newRowCardinality(5)
 
     assert(biggerIDSA.matrix.nrow == 5)
@@ -722,6 +724,95 @@ removed ==> u3     0             0       1           0
     tokenize(crossSimilarityLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
   }
 
+  test("ItemSimilarityDriver cross similarity two separate items spaces, 
adding rows in B") {
+    /* cross-similarity with category views, same user space
+               phones  tablets mobile_acc      soap
+            u1 0             1       1           0
+            u2 1             1       1           0
+removed ==> u3 0             0       1           0
+            u4 1             1       0           1
+    */
+    val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
+    val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
+    val OutPath = TmpDir + "similarity-matrices/"
+
+    val lines = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,phones",
+      "u1,view,mobile_acc",
+      "u2,view,phones",
+      "u2,view,tablets",
+      "u2,view,mobile_acc",
+      "u3,view,mobile_acc",// if this line is removed the cross-cooccurrence 
should work
+      "u4,view,phones",
+      "u4,view,tablets",
+      "u4,view,soap",
+      "u5,view,soap")
+
+    val UnequalDimensionsSimilarityTokens = List(
+      "galaxy",
+      "nexus:2.231435513142097",
+      "iphone:0.13844293808390518",
+      "nexus",
+      "galaxy:2.231435513142097",
+      "ipad",
+      "iphone:2.231435513142097",
+      "surface",
+      "iphone",
+      "ipad:2.231435513142097",
+      "galaxy:0.13844293808390518")
+
+    val UnequalDimensionsCrossSimilarityLines = List(
+      "galaxy",
+      "tablets:6.730116670092563",
+      "phones:2.9110316603236868",
+      "soap:0.13844293808390518",
+      "mobile_acc:0.13844293808390518",
+      "nexus",
+      "tablets:2.231435513142097",
+      "mobile_acc:1.184939225613002",
+      "phones:1.184939225613002",
+      "ipad", "mobile_acc:1.184939225613002",
+      "phones:1.184939225613002",
+      "surface",
+      "mobile_acc:1.184939225613002",
+      "iphone",
+      "phones:2.9110316603236868",
+      "soap:0.13844293808390518",
+      "tablets:0.13844293808390518",
+      "mobile_acc:0.13844293808390518")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other 
tests will
+    // take account of one actual file
+    val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
+    val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDColumn", "2",
+      "--rowIDColumn", "0",
+      "--filterColumn", "1",
+      "--writeAllDatasets"))
+
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
UnequalDimensionsSimilarityTokens
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
+  }
+
   // convert into an Iterable of tokens for 'should contain theSameElementsAs 
Iterable'
   def tokenize(a: Iterable[String]): Iterable[String] = {
     var r: Iterable[String] = Iterable()

Reply via email to