Repository: mahout
Updated Branches:
  refs/heads/master 4e6577d14 -> 666d314fb


http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 920c32b..f8abef3 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -17,15 +17,18 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, 
indexedDatasetDFSRead}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for 
[[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]].
- * Reads a text delimited file containing a Mahout DRM of the form
- * (row id, column id: strength, ...). The IDs are user specified strings 
which will be
- * preserved in the
- * output. The rows define a matrix and 
[[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]
+ * Command line interface for 
[[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]].
+ * Reads a text delimited file containing rows of a 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * with domain specific IDS of the form
+ * (row id, column id: strength, ...). The IDs will be preserved in the
+ * output. The rows define a matrix and 
[[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]
  * will be used to calculate row-wise similarity using log-likelihood
  * The options allow control of the input schema, file discovery, output 
schema, and control of
  * algorithm parameters.
@@ -36,14 +39,13 @@ import scala.collection.immutable.HashMap
  * @note To use with a Spark cluster see the --master option, if you run out 
of heap space check
  *       the --sparkExecutorMemory option.
  */
-object RowSimilarityDriver extends MahoutDriver {
+object RowSimilarityDriver extends MahoutSparkDriver {
   // define only the options specific to RowSimilarity
   private final val RowSimilarityOptions = HashMap[String, Any](
     "maxObservations" -> 500,
     "maxSimilaritiesPerRow" -> 100,
     "appName" -> "RowSimilarityDriver")
 
-  private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _
   private var readWriteSchema: Schema = _
 
   /**
@@ -110,10 +112,13 @@ object RowSimilarityDriver extends MahoutDriver {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
     // MahoutKryoRegistrator, it should be added to the register list here so 
it
-    // will be only spcific to this job.
+    // will be only specific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", 
parser.opts("sparkExecutorMem").asInstanceOf[String])
+      .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take 
this out?
+
+    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+      sparkConf.set("spark.executor.memory", 
parser.opts("sparkExecutorMem").asInstanceOf[String])
+    //else leave as set in Spark config
 
     super.start(masterUrl, appName)
 
@@ -123,20 +128,18 @@ object RowSimilarityDriver extends MahoutDriver {
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
 
-    readerWriter = new 
TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema)
-
   }
 
   private def readIndexedDataset: IndexedDataset = {
 
-    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], 
parser.opts("filenamePattern").asInstanceOf[String],
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], 
parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       null.asInstanceOf[IndexedDataset]
     } else {
 
-      val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles))
+      val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema)
       datasetA
     }
   }
@@ -146,12 +149,12 @@ object RowSimilarityDriver extends MahoutDriver {
 
     val indexedDataset = readIndexedDataset
 
-    val rowSimilarityDrm = 
SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, 
parser.opts("randomSeed").asInstanceOf[Int],
-      parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], 
parser.opts("maxObservations").asInstanceOf[Int])
+    val rowSimilarityIDS = SimilarityAnalysis.rowSimilarityIDS(indexedDataset,
+      parser.opts("randomSeed").asInstanceOf[Int],
+      parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int],
+      parser.opts("maxObservations").asInstanceOf[Int])
 
-    val rowSimilarityDataset = new 
IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm,
-      indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema)
-    rowSimilarityDataset.writeTo(dest = 
parser.opts("output").asInstanceOf[String])
+    rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], 
readWriteSchema)
 
     stop
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
deleted file mode 100644
index 92163be..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import scala.collection.mutable
-import scala.collection.mutable.HashMap
-
-/** Syntactic sugar for mutable.HashMap[String, Any]
-  *
-  * @param params list of mappings for instantiation {{{val mySchema = new 
Schema("one" -> 1, "two" -> "2", ...)}}}
-  */
-class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
-  // note: this require a mutable HashMap, do we care?
-  this ++= params
-
-  /** Constructor for copying an existing Schema
-    *
-    * @param schemaToClone return a copy of this Schema
-    */
-  def this(schemaToClone: Schema){
-    this()
-    this ++= schemaToClone
-  }
-}
-
-// These can be used to keep the text in and out fairly standard to Mahout, 
where an application specific
-// format is not required.
-
-/** Simple default Schema for typical text delimited element file input
-  * This tells the reader to input elements of the default (rowID<comma, tab, 
or space>columnID
-  * <comma, tab, or space>here may be other ignored text...)
-  */
-class DefaultElementReadSchema extends Schema(
-  "delim" -> "[,\t ]", //comma, tab or space
-  "filter" -> "",
-  "rowIDColumn" -> 0,
-  "columnIDPosition" -> 1,
-  "filterColumn" -> -1)
-
-/** Default Schema for text delimited drm file output
-  * This tells the writer to write a DRM of the default form:
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
-  */
-class DefaultDRMWriteSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> false)
-
-/** Default Schema for typical text delimited drm file input
-  * This tells the reader to input text lines of the form:
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
-  */
-class DefaultDRMReadSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ")
-
-/** Default Schema for reading a text delimited drm file  where the score of 
any element is ignored,
-  * all non-zeros are replaced with 1.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score 
is ignored.
-  * Alternatively the format can be
-  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score 
of 1. This is the default
-  * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
-  */
-class DRMReadBooleanSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> true)
-
-/** Default Schema for typical text delimited drm file write where the score 
of a element is omitted.
-  * The presence of a element means the score = 1, the absence means a score 
of 0.
-  * This tells the writer to output DRM lines of the form
-  * (rowID<tab>columnID1<space>columnID2...)
-  */
-class DRMWriteBooleanSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> true)
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 274ad98..ba8f7d1 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, 
IndexedDataset}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
 import com.google.common.collect.{BiMap, HashBiMap}
@@ -24,23 +26,23 @@ import org.apache.mahout.math.drm.{DistributedContext, 
CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
 
-/** Extends Reader trait to supply the 
[[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader 
function for reading text delimited files as described in the 
[[org.apache.mahout.drivers.Schema]]
+/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type 
read and a reader function for reading text delimited files as described in the 
[[Schema]]
   */
-trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
   /** Read in text delimited elements from all URIs in the comma delimited 
source String and return
     * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
     * no strength value in the element, assume it's presence means a strength 
of 1.
     *
     * @param mc context for the Spark job
     * @param readSchema describes the delimiters and positions of values in 
the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the 
[[org.apache.mahout.drivers.IndexedDataset]]
+    * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
     * @return
     */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset 
= {
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): 
IndexedDatasetSpark = {
     try {
       val delimiter = readSchema("delim").asInstanceOf[String]
       val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int]
@@ -105,7 +107,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a 
DrmLike[Int] is needed
       val drmInteractions = drmWrap[Int](indexedInteractions, numRows, 
numColumns)
 
-      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+      new IndexedDatasetSpark(drmInteractions, rowIDDictionary, 
columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
@@ -120,14 +122,14 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDataset]{
     *
     * @param mc context for the Spark job
     * @param readSchema describes the delimiters and positions of values in 
the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the 
[[org.apache.mahout.drivers.IndexedDataset]]
+    * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
     * @return
     */
   protected def drmReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset 
= {
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): 
IndexedDatasetSpark = {
     try {
       val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = 
readSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -193,7 +195,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a 
DrmLike[Int] is needed
       val drmInteractions = drmWrap[Int](indexedInteractions, numRows, 
numColumns)
 
-      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+      new IndexedDatasetSpark(drmInteractions, rowIDDictionary, 
columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
@@ -203,7 +205,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     }
   }
 
-      // this creates a BiMap from an ID collection. The ID points to an 
ordinal int
+  // this creates a BiMap from an ID collection. The ID points to an ordinal 
int
   // which is used internal to Mahout as the row or column ID
   // todo: this is a non-distributed process in an otherwise distributed 
reader and the BiMap is a
   // non-rdd based object--this will limit the size of the dataset to ones 
where the dictionaries fit
@@ -218,7 +220,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
   }
 }
 
-trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
 
   private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, 
score1), (_, score2)) => score1 > score2}
 
@@ -226,13 +228,13 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDataset]{
     *
     * @param mc context for the Spark job
     * @param writeSchema describes the delimiters and positions of values in 
the output text delimited file.
-    * @param dest directory to write text delimited version of 
[[org.apache.mahout.drivers.IndexedDataset]]
+    * @param dest directory to write text delimited version of 
[[IndexedDatasetSpark]]
     */
   protected def writer(
       mc: DistributedContext,
       writeSchema: Schema,
       dest: String,
-      indexedDataset: IndexedDataset,
+      indexedDataset: IndexedDatasetSpark,
       sort: Boolean = true): Unit = {
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
@@ -295,7 +297,7 @@ trait TDIndexedDatasetReaderWriter extends 
TDIndexedDatasetReader with TDIndexed
 /** Reads text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the 
text delimited file to be read.
   * @param mc Spark context for reading files
-  * @note The source is supplied by Reader#readElementsFrom .
+  * @note The source is supplied to Reader#readElementsFrom .
   * */
 class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
     (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
@@ -303,7 +305,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: 
Schema)
 /** Writes  text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
   * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
   * @param mc Spark context for reading files
-  * @note the destination is supplied by Writer#writeTo trait method
+  * @note the destination is supplied to Writer#writeTo
   * */
 class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: 
Boolean = true)(implicit val mc: DistributedContext) extends 
TDIndexedDatasetWriter
 
@@ -316,41 +318,3 @@ class TextDelimitedIndexedDatasetReaderWriter(val 
readSchema: Schema, val writeS
     (implicit val mc: DistributedContext)
   extends TDIndexedDatasetReaderWriter
 
-/** A version of IndexedDataset that has it's own writeTo method from a Writer 
trait. This is an alternative to creating
-  * a Writer based stand-alone class for writing. Consider it experimental 
allowing similar semantics to drm.writeDrm().
-  * Experimental because it's not clear that it is simpler or more intuitive 
and since IndexedDatasetTextDelimitedWriteables
-  * are probably short lived in terms of lines of code so complexity may be 
moot.
-  * @param matrix the data
-  * @param rowIDs bi-directional dictionary for rows of external IDs to 
internal ordinal Mahout IDs.
-  * @param columnIDs bi-directional dictionary for columns of external IDs to 
internal ordinal Mahout IDs.
-  * @param writeSchema contains params for the schema/format or the written 
text delimited file.
-  * @param mc mahout distributed context (DistributedContext) may be 
implicitly defined.
-  * */
-class IndexedDatasetTextDelimitedWriteable(
-    matrix: CheckpointedDrm[Int],
-    rowIDs: BiMap[String,Int],
-    columnIDs: BiMap[String,Int],
-    val writeSchema: Schema,
-    val sort: Boolean = true)
-    (implicit val mc: DistributedContext)
-  extends IndexedDataset(matrix, rowIDs, columnIDs) with 
TDIndexedDatasetWriter {
-
-  override def writeTo(collection: IndexedDataset = this, dest: String): Unit 
= {
-    super.writeTo(this, dest)
-  }
-}
-
-/**
-  * Companion object for the case class 
[[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily
-  * used to get a secondary constructor for
-  * making one 
[[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from 
another. Used when you have a
-  * factory like 
[[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
-  * {{{
-  *   val id = 
IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source))
-  * }}}
-  */
-
-object IndexedDatasetTextDelimitedWriteable {
-  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = 
new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, 
id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 08b2c34..c0d36c6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,10 +17,11 @@
 
 package org.apache.mahout.sparkbindings
 
-import java.io.IOException
-
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
 import org.apache.mahout.math._
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, 
Schema, DefaultIndexedDatasetElementReadSchema}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
@@ -250,6 +251,37 @@ object SparkEngine extends DistributedEngine {
     }
   }
 
+  /**
+   * reads an IndexedDatasetSpark from default text delimited files
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
+    val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+    val ids = reader.readDRMFrom(src, existingRowIDs)
+    ids
+  }
+
+  /**
+   * reads an IndexedDatasetSpark from default text delimited files
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
+    val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+    val ids = reader.readElementsFrom(src, existingRowIDs)
+    ids
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index b753f6f..e5a2b2a 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -197,4 +197,16 @@ class CheckpointedDrmSpark[K: ClassTag](
   protected def computeNNonZero =
     cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
 
+  /** Changes the number of rows in the DRM without actually touching the 
underlying data. Used to
+    * redimension a DRM after it has been created, which implies some blank, 
non-existent rows.
+    * @param n new row dimension
+    * @return
+    */
+  override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+    assert(n > -1)
+    assert( n >= nrow)
+    val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+    newCheckpointedDrm
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
new file mode 100644
index 0000000..d3aa0a8
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+import 
org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, 
Reader, Schema, IndexedDataset}
+
+/**
+ * Spark implementation of 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark 
specific
+ * dfsWrite method
+ */
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: 
BiMap[String,Int], val columnIDs: BiMap[String,Int])
+  extends IndexedDataset {
+
+  /** Secondary constructor enabling immutability */
+  def this(id2: IndexedDatasetSpark){
+    this(id2.matrix, id2.rowIDs, id2.columnIDs)
+  }
+
+  /** Factory method used to create this extending class when the interface of
+    * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is 
known. */
+  override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], 
columnIDs: BiMap[String,Int]):
+    IndexedDatasetSpark = {
+    new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
+  }
+
+  /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+  override def dfsWrite(dest: String, schema: Schema = 
DefaultIndexedDatasetWriteSchema)
+      (implicit sc: DistributedContext):
+    Unit = {
+    val writer = new TextDelimitedIndexedDatasetWriter(schema)(sc)
+    writer.writeTo(this, dest)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 6ca2a14..c441716 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout
 
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
+import org.apache.mahout.math.indexeddataset.Schema
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.spark.{SparkConf, SparkContext}
 import java.io._
 import scala.collection.mutable.ArrayBuffer
@@ -224,5 +227,4 @@ package object sparkbindings {
     mcjars
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala 
b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
deleted file mode 100644
index 29c7b84..0000000
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math.cf.SimilarityAnalysis
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings.{MatrixOps, _}
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.test.MahoutSuite
-import org.scalatest.FunSuite
-
-/* values 
-A =
-1      1       0       0       0
-0      0       1       1       0
-0      0       0       0       1
-1      0       0       1       0
-
-B =
-1      1       1       1       0
-1      1       1       1       0
-0      0       1       0       1
-1      1       0       1       0
- */
-
-class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with 
DistributedSparkSuite {
-
-  // correct cooccurrence with LLR
-  final val matrixLLRCoocAtAControl = dense(
-    (0.0,                1.7260924347106847, 0.0,                     0.0,     
           0.0),
-    (1.7260924347106847, 0.0,                0.0,                     0.0,     
           0.0),
-    (0.0,                0.0,                0.0,                     
1.7260924347106847, 0.0),
-    (0.0,                0.0,                1.7260924347106847,      0.0,     
           0.0),
-    (0.0,                0.0,                0.0,                     0.0,     
           0.0))
-
-  // correct cross-cooccurrence with LLR
-  final val m = dense(
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.6795961471815897),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
-    (0.0,                0.0,                0.0,                0.0,          
      4.498681156950466))
-
-  final val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 
1.7260924347106847, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 
0.6795961471815897, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 
0.6795961471815897, 0.0),
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 
1.7260924347106847, 0.0),
-      (0.0,                0.0,                0.6795961471815897, 0.0,        
        4.498681156950466))
-
-
-  test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
-    val a = dense(
-        (1, 1, 0, 0, 0),
-        (0, 0, 1, 1, 0),
-        (0, 0, 0, 0, 1),
-        (1, 0, 0, 1, 0))
-
-    val b = dense(
-        (1, 1, 1, 1, 0),
-        (1, 1, 1, 1, 0),
-        (0, 0, 1, 0, 1),
-        (1, 1, 0, 1, 0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed 
= 1, drmBs = Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-
-  }
-
-  test("cooccurrence [A'A], [B'A] double data using LLR") {
-    val a = dense(
-        (100000.0D, 1.0D,  0.0D,  0.0D,     0.0D),
-        (     0.0D, 0.0D, 10.0D,  1.0D,     0.0D),
-        (     0.0D, 0.0D,  0.0D,  0.0D,  1000.0D),
-        (     1.0D, 0.0D,  0.0D, 10.0D,     0.0D))
-
-    val b = dense(
-        (10000.0D, 100.0D,     1000.0D,      1.0D,   0.0D),
-        (   10.0D,   1.0D, 10000000.0D,     10.0D,   0.0D),
-        (    0.0D,   0.0D,     1000.0D,      0.0D, 100.0D),
-        (  100.0D,   1.0D,        0.0D, 100000.0D,   0.0D))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-  }
-
-  test("cooccurrence [A'A], [B'A] integer data using LLR") {
-    val a = dense(
-        ( 1000,  10,       0,    0,   0),
-        (    0,   0,  -10000,   10,   0),
-        (    0,   0,       0,    0, 100),
-        (10000,   0,       0, 1000,   0))
-
-    val b = dense(
-        (  100, 1000, -10000, 10000,    0),
-        (10000, 1000,    100,    10,    0),
-        (    0,    0,     10,     0, -100),
-        (   10,  100,      0,  1000,    0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-   //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
-    //var cp = drmSelfCooc(0).checkpoint()
-    //cp.writeDRM("/tmp/cooc-spark/")//to get values written
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-  }
-
-  test("cooccurrence two matrices with different number of columns"){
-    val a = dense(
-      (1, 1, 0, 0, 0),
-      (0, 0, 1, 1, 0),
-      (0, 0, 0, 0, 1),
-      (1, 0, 0, 1, 0))
-
-    val b = dense(
-      (0, 1, 1, 0),
-      (1, 1, 1, 0),
-      (0, 0, 1, 0),
-      (1, 1, 0, 1))
-
-    val matrixLLRCoocBtANonSymmetric = dense(
-      (0.0,                1.7260924347106847, 1.7260924347106847, 
1.7260924347106847),
-      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 
1.7260924347106847),
-      (0.0,                0.0,                0.6795961471815897, 0.0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-
-    //cooccurrence without LLR is just a A'B
-    //val inCoreAtB = a.transpose().times(b)
-    //val bp = 0
-  }
-
-  test("LLR calc") {
-    val A = dense(
-        (1, 1, 0, 0, 0),
-        (0, 0, 1, 1, 0),
-        (0, 0, 0, 0, 1),
-        (1, 0, 0, 1, 0))
-
-    val AtA = A.transpose().times(A)
-
-    /* AtA is:
-      0  =>    {0:2.0,1:1.0,3:1.0}
-      1  =>    {0:1.0,1:1.0}
-      2  =>    {2:1.0,3:1.0}
-      3  =>    {0:1.0,2:1.0,3:2.0}
-      4  =>    {4:1.0}
-
-          val AtAd = dense(
-         (2, 1, 0, 1, 0),
-         (1, 1, 0, 0, 0),
-         (0, 0, 1, 1, 0),
-         (1, 0, 1, 2, 0),
-         (0, 0, 0, 0, 1))
-
-         val AtAdNoSelfCooc = dense(
-         (0, 1, 0, 1, 0),
-         (1, 0, 0, 0, 0),
-         (0, 0, 0, 1, 0),
-         (1, 0, 1, 0, 0),
-         (0, 0, 0, 0, 0))
-
-    */
-
-    //item (1,0)
-    val numInteractionsWithAandB = 1L
-    val numInteractionsWithA = 1L
-    val numInteractionsWithB = 2L
-    val numInteractions = 6l
-
-    val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, 
numInteractionsWithB, numInteractionsWithAandB, numInteractions)
-
-    assert(llr == 2.6341457841558764) // value calculated by hadoop 
itemsimilairty
-  }
-
-  test("downsampling by number per row") {
-    val a = dense(
-        (1, 1, 1, 1, 0),
-        (1, 1, 1, 1, 1),
-        (0, 0, 0, 0, 1),
-        (1, 1, 0, 1, 0))
-    val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
-
-    val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 
0xdeadbeef, 4)
-    //count non-zero values, should be == 7
-    var numValues = 0
-    val m = downSampledDrm.collect
-    val it = m.iterator()
-    while (it.hasNext) {
-      val v = it.next().vector()
-      val nonZeroIt = v.nonZeroes().iterator()
-      while (nonZeroIt.hasNext) {
-        numValues += 1
-        nonZeroIt.next()
-      }
-    }
-
-    assert(numValues == 8) //Don't change the random seed or this may fail.
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala 
b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
new file mode 100644
index 0000000..0b3b3eb
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.{MatrixOps, _}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+import org.apache.mahout.test.MahoutSuite
+import org.scalatest.FunSuite
+
+/* values 
+A =
+1      1       0       0       0
+0      0       1       1       0
+0      0       0       0       1
+1      0       0       1       0
+
+B =
+1      1       1       1       0
+1      1       1       1       0
+0      0       1       0       1
+1      1       0       1       0
+ */
+
+// todo: add tests for the IndexedDataset coccurrence methods
+
+class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with 
DistributedSparkSuite {
+
+  // correct cooccurrence with LLR
+  final val matrixLLRCoocAtAControl = dense(
+    (0.0,                1.7260924347106847, 0.0,                     0.0,     
           0.0),
+    (1.7260924347106847, 0.0,                0.0,                     0.0,     
           0.0),
+    (0.0,                0.0,                0.0,                     
1.7260924347106847, 0.0),
+    (0.0,                0.0,                1.7260924347106847,      0.0,     
           0.0),
+    (0.0,                0.0,                0.0,                     0.0,     
           0.0))
+
+  // correct cross-cooccurrence with LLR
+  final val m = dense(
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.6795961471815897),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 
1.7260924347106847, 0.0),
+    (0.0,                0.0,                0.0,                0.0,          
      4.498681156950466))
+
+  final val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 
1.7260924347106847, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 
0.6795961471815897, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 
0.6795961471815897, 0.0),
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 
1.7260924347106847, 0.0),
+      (0.0,                0.0,                0.6795961471815897, 0.0,        
        4.498681156950466))
+
+
+  test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
+    val a = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val b = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 0),
+        (0, 0, 1, 0, 1),
+        (1, 1, 0, 1, 0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed 
= 1, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+
+  }
+
+  test("cooccurrence [A'A], [B'A] double data using LLR") {
+    val a = dense(
+        (100000.0D, 1.0D,  0.0D,  0.0D,     0.0D),
+        (     0.0D, 0.0D, 10.0D,  1.0D,     0.0D),
+        (     0.0D, 0.0D,  0.0D,  0.0D,  1000.0D),
+        (     1.0D, 0.0D,  0.0D, 10.0D,     0.0D))
+
+    val b = dense(
+        (10000.0D, 100.0D,     1000.0D,      1.0D,   0.0D),
+        (   10.0D,   1.0D, 10000000.0D,     10.0D,   0.0D),
+        (    0.0D,   0.0D,     1000.0D,      0.0D, 100.0D),
+        (  100.0D,   1.0D,        0.0D, 100000.0D,   0.0D))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence [A'A], [B'A] integer data using LLR") {
+    val a = dense(
+        ( 1000,  10,       0,    0,   0),
+        (    0,   0,  -10000,   10,   0),
+        (    0,   0,       0,    0, 100),
+        (10000,   0,       0, 1000,   0))
+
+    val b = dense(
+        (  100, 1000, -10000, 10000,    0),
+        (10000, 1000,    100,    10,    0),
+        (    0,    0,     10,     0, -100),
+        (   10,  100,      0,  1000,    0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+   //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
+    //var cp = drmSelfCooc(0).checkpoint()
+    //cp.writeDRM("/tmp/cooc-spark/")//to get values written
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence two matrices with different number of columns"){
+    val a = dense(
+      (1, 1, 0, 0, 0),
+      (0, 0, 1, 1, 0),
+      (0, 0, 0, 0, 1),
+      (1, 0, 0, 1, 0))
+
+    val b = dense(
+      (0, 1, 1, 0),
+      (1, 1, 1, 0),
+      (0, 0, 1, 0),
+      (1, 1, 0, 1))
+
+    val matrixLLRCoocBtANonSymmetric = dense(
+      (0.0,                1.7260924347106847, 1.7260924347106847, 
1.7260924347106847),
+      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 
1.7260924347106847),
+      (0.0,                0.0,                0.6795961471815897, 0.0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = 
Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+
+    //cooccurrence without LLR is just a A'B
+    //val inCoreAtB = a.transpose().times(b)
+    //val bp = 0
+  }
+
+  test("LLR calc") {
+    val A = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val AtA = A.transpose().times(A)
+
+    /* AtA is:
+      0  =>    {0:2.0,1:1.0,3:1.0}
+      1  =>    {0:1.0,1:1.0}
+      2  =>    {2:1.0,3:1.0}
+      3  =>    {0:1.0,2:1.0,3:2.0}
+      4  =>    {4:1.0}
+
+          val AtAd = dense(
+         (2, 1, 0, 1, 0),
+         (1, 1, 0, 0, 0),
+         (0, 0, 1, 1, 0),
+         (1, 0, 1, 2, 0),
+         (0, 0, 0, 0, 1))
+
+         val AtAdNoSelfCooc = dense(
+         (0, 1, 0, 1, 0),
+         (1, 0, 0, 0, 0),
+         (0, 0, 0, 1, 0),
+         (1, 0, 1, 0, 0),
+         (0, 0, 0, 0, 0))
+
+    */
+
+    //item (1,0)
+    val numInteractionsWithAandB = 1L
+    val numInteractionsWithA = 1L
+    val numInteractionsWithB = 2L
+    val numInteractions = 6l
+
+    val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, 
numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+    assert(llr == 2.6341457841558764) // value calculated by hadoop 
itemsimilairty
+  }
+
+  test("downsampling by number per row") {
+    val a = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 1),
+        (0, 0, 0, 0, 1),
+        (1, 1, 0, 1, 0))
+    val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
+
+    val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 
0xdeadbeef, 4)
+    //count non-zero values, should be == 7
+    var numValues = 0
+    val m = downSampledDrm.collect
+    val it = m.iterator()
+    while (it.hasNext) {
+      val v = it.next().vector()
+      val nonZeroIt = v.nonZeroes().iterator()
+      while (nonZeroIt.hasNext) {
+        numValues += 1
+        nonZeroIt.next()
+      }
+    }
+
+    assert(numValues == 8) //Don't change the random seed or this may fail.
+  }
+}

Reply via email to