Cooccurrence is implemented on Spark and a ticket for doing the
RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity
is implemented. Will use only LLR for now. The driver reads text files (a text
version of DRM).
If you want to wrap Spark cooccurrence yourself you can read in a DRM from the
Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try
any of this let me know and I’ll give some pointers.
I’ve been wondering about the threshold too. Seems like an absolute threshold
is very hard to use. Was thinking that some % of values might be a better
measure. Imagine you want 100 similar items per item, so 100 per row in the
similarity matrix—set 100 per row as your threshold in this future Spark-RSJ.
But you’d get 100 on average, it wouldn’t be guaranteed for any particular
item. Over all it would be 100*number of rows that you’d get and they’d be the
items with highest similarity scores of the total created. Do you think this
would help?
On Aug 5, 2014, at 4:50 PM, Reinis Vicups <mah...@orbit-x.de> wrote:
Hi,
we have had good results with RowSimilarityJob in our Use Case with some
quality loss due to pruning and decline in performance if setting thresholds
too high/low.
The current work on mahout integration with spark done by dlyubimov, pferrel
and others is just amazing (although I would love to see more inline comments
especially for classes like SparkEngine.scala or what the heck are those in
org.apache.mahout.sparkbindings.blas :D )!
Either I haven't looked hard enough, or the RowSimilarityJob for spark is not
implemented just yet (will it be at some point)? So my two colleagues (hi
Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some
extent) in spark.
I am providing the very first and overly simplified version of implementation
below and would greatly appreciate any feedback on whatever aspect of out
implementation you would like to comment on!
Some fun facts:
We have roughly 10k support tickets containing textual information (title,
description, comments of participating agents, solutions, attachment texts that
we are also extracting). The raw data we import from relational DB into HDFS is
roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k
vectors with the dimensionality of roughly 300k.
The job is executed on our test cluster consisting of dual core machines with
32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in
implementation, currently we do no pruning, no limitations of observations, no
thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We
have observed some non-linear horizontal scalability (started with one node,
then three, then four and the execution time reduced slightly). The mahout's
RowSimilarityJob with maxed out observations completes in 12 minutes for the
same data set.
"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"
Thanks, guys, for your time reading this and, again, any feedback (especially
on how to improve the implementation ;) ) is greatly appreciated
reinis
---
The code:
import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}
import scala.annotation.tailrec
import scala.collection.LinearSeq
object RowSimilaritySparkJob {
def main(args: Array[String]) {
val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
val sc = new SparkContext(sConf)
@transient val hConf = HBaseConfiguration.create()
val fullTableName = "sparsevectortfidfvectors"
hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
val documentRows = rdd.map { get =>
// read-in tfidf from HBase
val vectorWritable = new VectorWritable()
val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
val vector = vectorWritable.get()
// output tuple (documentId, tfidfVector)
(bytesToInt(get._1.get()), vector)
}
/* Stage 1 : normalize & transpose */
val termRows = documentRows
// we do no normalization since we use cosine similarity and seq2sparse
already generates normalized tfidf values
// write in what documents the given termid appears in (term-document
co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
.flatMap { case(documentId, tfidfVector) =>
import scala.collection.JavaConversions._
for( element <- tfidfVector.nonZeroes() ) yield {
// output multiple tuples (termid, (documentid, tfidf of term in
document))
element.index() -> (documentId -> element.get())
}
}
// combine term-document co-occurrence fragment by merging the vectors
and thus creating
// full vector (or in our case linear sequence) of documents the termid
appears in
// (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId
-> term-tfidf in that document), ...))
.combineByKey[LinearSeq[(Int, Double)]](
(x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
(v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
(combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)])
=> { combiner ++ combinee // concatenating Seq's from different workers?},
40 // this is just number of partitions depending on available hardware
(cpus))
/* Stage 2 : co-occurrence triangular matrix */
// write upper half of the document co-occurrence matrix (triangular
matrix) one matrix per termid.
// we take vectors (sequences) generated in the previous step and traverse
vector dimensions (containing tuple documentid -> tfidf)
// building triangular matrix elements (rowid -> (colid -> row-col
value)). Here the rowid and colid are documentids from the vector
// of the term and row-col value is term-tfidf of both documents
multiplied with each other (in our case cosine similarity).
// result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId
-> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
// representing rows of upper triangular matrix, one per termid
.flatMap { x: (Int, LinearSeq[(Int, Double)]) =>
triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
// tuples from the previous step are reduced by key (leftDocumentId,
rightDocumentId) over all termid matrixes.
// since most of the documents contain multiple terms, there will be rows
for given leftdocumentid, rightdocumentid in multiple matrixes
// reduce all leftDocumentId, rightDocuemtnId co-occurrences over all
common terms (it is ensured through reducing by key) summing the similarities up
// this results into sequence representing triangular matrix aggregated
by (leftDocumentId, rightDocumentId):
// ((leftDocumentId, rightDocumentId), sum of all term-tfidfs),
((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
.reduceByKey(_ + _, 40)
/* Stage 3 : create similarities */
// symetrify document co-occurrence matrix by generating lower triangular
matrix from the upper triangular matrix through
// swapping left document id and right document id (original (leftdocid,
rightdocid), tfidfsum) is retained aswell ofcourse
.flatMap { x: ((Int, Int), Double) =>
LinearSeq(x, ((x._1._2, x._1._1), x._2))
}
// save in hadoop
.saveAsTextFile("row-similarity-test")
}
/**
* Produces LinearSeq representing triangular matrix of document co-occurrences
* @param accumulator is a temporary variable used by recursion (enables
tail-recursion)
* @param corpus shall contain at least two elements (2 document occurrences),
otherwise an empty Seq will be returned
* @return Seq of document co-occurrences formated as ((leftDocumentId ->
rightDocumentId) -> leftTFIDF * rightTFIDF)
*/
@tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int,
Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int),
Double)] = corpus match {
case h +: Seq() => accumulator
case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t)
yield ((h._1, e._1), e._2 * h._2)), t)
}
}