Hi,
we have had good results with RowSimilarityJob in our Use Case with some
quality loss due to pruning and decline in performance if setting
thresholds too high/low.
The current work on mahout integration with spark done by dlyubimov,
pferrel and others is just amazing (although I would love to see more
inline comments especially for classes like SparkEngine.scala or what
the heck are those in org.apache.mahout.sparkbindings.blas :D )!
Either I haven't looked hard enough, or the RowSimilarityJob for spark
is not implemented just yet (will it be at some point)? So my two
colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate
RowSimilarityJob (to some extent) in spark.
I am providing the very first and overly simplified version of
implementation below and would greatly appreciate any feedback on
whatever aspect of out implementation you would like to comment on!
Some fun facts:
We have roughly 10k support tickets containing textual information
(title, description, comments of participating agents, solutions,
attachment texts that we are also extracting). The raw data we import
from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data
is roughly 80 MB and contains 10k vectors with the dimensionality of
roughly 300k.
The job is executed on our test cluster consisting of dual core machines
with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will
see in implementation, currently we do no pruning, no limitations of
observations, no thresholds - so it runs on whole corpus. And completes
in 18 to 22 minutes. We have observed some non-linear horizontal
scalability (started with one node, then three, then four and the
execution time reduced slightly). The mahout's RowSimilarityJob with
maxed out observations completes in 12 minutes for the same data set.
"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"
Thanks, guys, for your time reading this and, again, any feedback
(especially on how to improve the implementation ;) ) is greatly appreciated
reinis
---
The code:
import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}
import scala.annotation.tailrec
import scala.collection.LinearSeq
object RowSimilaritySparkJob {
def main(args: Array[String]) {
val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
val sc = new SparkContext(sConf)
@transient val hConf = HBaseConfiguration.create()
val fullTableName = "sparsevectortfidfvectors"
hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
val documentRows = rdd.map { get =>
// read-in tfidf from HBase
val vectorWritable = new VectorWritable()
val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
val vector = vectorWritable.get()
// output tuple (documentId, tfidfVector)
(bytesToInt(get._1.get()), vector)
}
/* Stage 1 : normalize & transpose */
val termRows = documentRows
// we do no normalization since we use cosine similarity and
seq2sparse already generates normalized tfidf values
// write in what documents the given termid appears in
(term-document co-occurrence) as a tuple (termid, (documentid, tfidf of
term in document))
.flatMap { case(documentId, tfidfVector) =>
import scala.collection.JavaConversions._
for( element <- tfidfVector.nonZeroes() ) yield {
// output multiple tuples (termid, (documentid, tfidf of
term in document))
element.index() -> (documentId -> element.get())
}
}
// combine term-document co-occurrence fragment by merging the
vectors and thus creating
// full vector (or in our case linear sequence) of documents
the termid appears in
// (termid -> Seq((documentId -> term-tfidf in document),
(anotherDocumentId -> term-tfidf in that document), ...))
.combineByKey[LinearSeq[(Int, Double)]](
(x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
(v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
(combiner: LinearSeq[(Int, Double)], combinee:
LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating
Seq's from different workers?},
40 // this is just number of partitions depending on
available hardware (cpus))
/* Stage 2 : co-occurrence triangular matrix */
// write upper half of the document co-occurrence matrix
(triangular matrix) one matrix per termid.
// we take vectors (sequences) generated in the previous step
and traverse vector dimensions (containing tuple documentid -> tfidf)
// building triangular matrix elements (rowid -> (colid ->
row-col value)). Here the rowid and colid are documentids from the vector
// of the term and row-col value is term-tfidf of both
documents multiplied with each other (in our case cosine similarity).
// result is sequence of tuples (leftDocumentId ->
LinearSeq((rightDocumentId -> left-tfidf * right-tfidf),
(anotherRightDocumentId -> left-tfidf * right-tfidf))
// representing rows of upper triangular matrix, one per termid
.flatMap { x: (Int, LinearSeq[(Int, Double)]) =>
triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
// tuples from the previous step are reduced by key
(leftDocumentId, rightDocumentId) over all termid matrixes.
// since most of the documents contain multiple terms, there
will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
// reduce all leftDocumentId, rightDocuemtnId co-occurrences
over all common terms (it is ensured through reducing by key) summing
the similarities up
// this results into sequence representing triangular matrix
aggregated by (leftDocumentId, rightDocumentId):
// ((leftDocumentId, rightDocumentId), sum of all term-tfidfs),
((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
.reduceByKey(_ + _, 40)
/* Stage 3 : create similarities */
// symetrify document co-occurrence matrix by generating lower
triangular matrix from the upper triangular matrix through
// swapping left document id and right document id (original
(leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
.flatMap { x: ((Int, Int), Double) =>
LinearSeq(x, ((x._1._2, x._1._1), x._2))
}
// save in hadoop
.saveAsTextFile("row-similarity-test")
}
/**
* Produces LinearSeq representing triangular matrix of document
co-occurrences
* @param accumulator is a temporary variable used by recursion
(enables tail-recursion)
* @param corpus shall contain at least two elements (2 document
occurrences), otherwise an empty Seq will be returned
* @return Seq of document co-occurrences formated as
((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
*/
@tailrec private def triangularCoOccurrenceMatrix(accumulator:
LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]):
LinearSeq[((Int, Int), Double)] = corpus match {
case h +: Seq() => accumulator
case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for
(e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
}
}