Hi,

we have had good results with RowSimilarityJob in our Use Case with some quality loss due to pruning and decline in performance if setting thresholds too high/low.

The current work on mahout integration with spark done by dlyubimov, pferrel and others is just amazing (although I would love to see more inline comments especially for classes like SparkEngine.scala or what the heck are those in org.apache.mahout.sparkbindings.blas :D )!

Either I haven't looked hard enough, or the RowSimilarityJob for spark is not implemented just yet (will it be at some point)? So my two colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some extent) in spark.

I am providing the very first and overly simplified version of implementation below and would greatly appreciate any feedback on whatever aspect of out implementation you would like to comment on!

Some fun facts:
We have roughly 10k support tickets containing textual information (title, description, comments of participating agents, solutions, attachment texts that we are also extracting). The raw data we import from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k vectors with the dimensionality of roughly 300k.

The job is executed on our test cluster consisting of dual core machines with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in implementation, currently we do no pruning, no limitations of observations, no thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We have observed some non-linear horizontal scalability (started with one node, then three, then four and the execution time reduced slightly). The mahout's RowSimilarityJob with maxed out observations completes in 12 minutes for the same data set.

"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"

Thanks, guys, for your time reading this and, again, any feedback (especially on how to improve the implementation ;) ) is greatly appreciated
reinis

---
The code:

import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}

import scala.annotation.tailrec
import scala.collection.LinearSeq

object RowSimilaritySparkJob {
  def main(args: Array[String]) {
      val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
      val sc = new SparkContext(sConf)

      @transient val hConf = HBaseConfiguration.create()
      val fullTableName = "sparsevectortfidfvectors"
      hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)

val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

      val documentRows = rdd.map { get =>
        // read-in tfidf from HBase
        val vectorWritable = new VectorWritable()
        val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
        val vector = vectorWritable.get()

        // output tuple (documentId, tfidfVector)
        (bytesToInt(get._1.get()), vector)
      }

      /* Stage 1 : normalize & transpose */
      val termRows = documentRows

// we do no normalization since we use cosine similarity and seq2sparse already generates normalized tfidf values // write in what documents the given termid appears in (term-document co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
        .flatMap { case(documentId, tfidfVector) =>
          import scala.collection.JavaConversions._
          for( element <- tfidfVector.nonZeroes() ) yield {
// output multiple tuples (termid, (documentid, tfidf of term in document))
            element.index() -> (documentId -> element.get())
          }
        }

// combine term-document co-occurrence fragment by merging the vectors and thus creating // full vector (or in our case linear sequence) of documents the termid appears in // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId -> term-tfidf in that document), ...))
        .combineByKey[LinearSeq[(Int, Double)]](
          (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
          (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
(combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating Seq's from different workers?}, 40 // this is just number of partitions depending on available hardware (cpus))

        /* Stage 2 : co-occurrence triangular matrix */
// write upper half of the document co-occurrence matrix (triangular matrix) one matrix per termid. // we take vectors (sequences) generated in the previous step and traverse vector dimensions (containing tuple documentid -> tfidf) // building triangular matrix elements (rowid -> (colid -> row-col value)). Here the rowid and colid are documentids from the vector // of the term and row-col value is term-tfidf of both documents multiplied with each other (in our case cosine similarity). // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
        // representing rows of upper triangular matrix, one per termid
.flatMap { x: (Int, LinearSeq[(Int, Double)]) => triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }

// tuples from the previous step are reduced by key (leftDocumentId, rightDocumentId) over all termid matrixes. // since most of the documents contain multiple terms, there will be rows for given leftdocumentid, rightdocumentid in multiple matrixes // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all common terms (it is ensured through reducing by key) summing the similarities up // this results into sequence representing triangular matrix aggregated by (leftDocumentId, rightDocumentId): // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
        .reduceByKey(_ + _, 40)

        /* Stage 3 : create similarities */
// symetrify document co-occurrence matrix by generating lower triangular matrix from the upper triangular matrix through // swapping left document id and right document id (original (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
        .flatMap { x: ((Int, Int), Double) =>
            LinearSeq(x, ((x._1._2, x._1._1), x._2))
        }

        // save in hadoop
        .saveAsTextFile("row-similarity-test")
  }

  /**
* Produces LinearSeq representing triangular matrix of document co-occurrences * @param accumulator is a temporary variable used by recursion (enables tail-recursion) * @param corpus shall contain at least two elements (2 document occurrences), otherwise an empty Seq will be returned * @return Seq of document co-occurrences formated as ((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
   */
@tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), Double)] = corpus match {
    case h +: Seq() => accumulator
case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
  }
}

Reply via email to