Ok, I did a number of re-factorings and one among them, that blew my mind.

As you may or may not know this:

.reduceByKey(_ + _, 40) // do it with 40 partitions

cause spark to partition data into 40 parts with default, as I understand, hash-based partitioner. Then during execution these partitions get processed by separate tasks that in turn are distributed and spawned (in parallel) by available workers (nodes).

During my tests I observed that there were always 2-3-4 long running tasks that determined the critical path of the whole spark job (as in, there was one task running for whole 18 minutes). Also I observed that only through increasing number of partitions those long running tasks got shorter! So I was increasing gradually number of partitions and at 400 partitions it finally rocked.

At the moment the very same job that previously ran for 18-22 minutes now is complete in under 3!

I don't know much about spark scheduling and what overhead there is for spawning and despawning tasks but what I observe in my tests look as if
- tasks are queued on all available worker nodes by simple round robin;
- queued tasks get executed up to 100% cpu load, others wait until load drops and then get spawned (I am not sure but it looks as if each task requires more or less equal portion of cpu power); - spawn/despawn overhead is really small (I read someplace that its about 200ms per task); - default partitioner is not that smart but even with brute force (as in, just increase partitions to some large number) it manages to make small enough partitions.

So the combo of large number of partitions and, if possible, smart partitioner implementation is doing the trick in the case of RSJ.

I will be testing the job with much larger data set (328k "fat" documents instead of 10k) and will report on findings.

kind regards
reinis

P.S. Pat, thank you so much for the link to docs, no idea how the heck I oversaw them!

On 06.08.2014 19:13, Pat Ferrel wrote:
I ran itemsimilarity on the epinions data on a small 3 machine cluster and 
found that it ran in 20 some minutes using the old hadoop version, while the 
spark-itemsimilarity ran in 2+ minutes so it looks like 10x the performance. 
This is using the same “cheats” in both cases.

100x is not likely to be seen except in special cases that fit hadoop’s 
mapreduce poorly and Spark’s much better. For example if a pipeline is long, 
requiring lots of serialization and IO on hadoop and none using Spark’s 
in-memory RDDs. I doubt we’ll see that with RSJ, which is fairly simple.

Other comments inline

On Aug 5, 2014, at 9:38 PM, Reinis Vicups <mah...@orbit-x.de> wrote:

Yes, that would make useage of threshold and the expected(?) quality of the 
result better.
It may also have some of the same side effects as the current threshold, like 
having no similar items for some. But it does produce an average “quality” 
better than other sub-sampling techniques.

The configuration tho was not the main point for us to try to implement 
RowSimilarityJob with spark, but rather
- promise of spark to work 100x times faster than hadoops map-reduce,
- existence of "cheats" in current RowSimilarityJob (observations, threshold, max 
similarities per row), we are willing to have a super performant "loss less" solution.

With our initial implementation we observe that spark implementation is 
performing worse than original RSJ and I was wondering if community could hint 
us on why so.

I am making couple of guesses myself:
- we're not using any checkpointing, caching, sharing, broadcasting of 
intermediate results and I am kinda unsure if that is applicable to RSJ;
It does apply, Mahout uses caching to keep some data in memory and there is an 
optimizer (BLAS type) build in to the DRM math calculations. Scala supports 
lazy evaluation, which allows the optimizer to work invisibly in the 
background. This means a “slim” A’A to seed the item similarity calc.

- we're not using kryoserializer, possibly that could have some reasonable 
impact on performance;
We are using kryo. Can’t speak to the speed comparison but maybe others can.

- we're using only standard scala collections. I did use breeze.linalg 
SparseVector at some point but I couldn't observe any performance increase. Do 
you guys have any experience on using specialized linalg collections versus 
LinearSeq of scala versus IndexedSeq of scala?
Mahout uses Scala extensions and conversions to make Scala style use of the 
Mahout Java optimized SparseVector types and iterators. The DRM has been 
completely implemented for Spark and has some optimizations. One recent 
optimization was in the way non-zero elements of a sparse vector are iterated.

If you’d like to use any of this I suggest reading the docs on the Mahout Scala 
DSL here: http://mahout.apache.org/users/sparkbindings/home.html and the full 
PDF here: http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf

You can even fire up the Spark/Scala shell and play with your code 
interactively.

thanks
reinis

On 06.08.2014 03:04, Pat Ferrel wrote:
Cooccurrence is implemented on Spark and a ticket for doing the 
RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity 
is implemented. Will use only LLR for now. The driver reads text files (a text 
version of DRM).

If you want to wrap Spark cooccurrence yourself you can read in a DRM from the 
Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try 
any of this let me know and I’ll give some pointers.

I’ve been wondering about the threshold too. Seems like an absolute threshold 
is very hard to use. Was thinking that some % of values might be a better 
measure. Imagine you want 100 similar items per item, so 100 per row in the 
similarity matrix—set 100 per row as your threshold in this future Spark-RSJ. 
But you’d get 100 on average, it wouldn’t be guaranteed for any particular 
item. Over all it would be 100*number of rows that you’d get and they’d be the 
items with highest similarity scores of the total created. Do you think this 
would help?

On Aug 5, 2014, at 4:50 PM, Reinis Vicups <mah...@orbit-x.de> wrote:

Hi,

we have had good results with RowSimilarityJob in our Use Case with some 
quality loss due to pruning and decline in performance if setting thresholds 
too high/low.

The current work on mahout integration with spark done by dlyubimov, pferrel 
and others is just amazing (although I would love to see more inline comments 
especially for classes like SparkEngine.scala or what the heck are those in 
org.apache.mahout.sparkbindings.blas :D )!

Either I haven't looked hard enough, or the RowSimilarityJob for spark is not 
implemented just yet (will it be at some point)? So my two colleagues (hi 
Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some 
extent) in spark.

I am providing the very first and overly simplified version of implementation 
below and would greatly appreciate any feedback on whatever aspect of out 
implementation you would like to comment on!

Some fun facts:
We have roughly 10k support tickets containing textual information (title, 
description, comments of participating agents, solutions, attachment texts that 
we are also extracting). The raw data we import from relational DB into HDFS is 
roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k 
vectors with the dimensionality of roughly 300k.

The job is executed on our test cluster consisting of dual core machines with 
32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in 
implementation, currently we do no pruning, no limitations of observations, no 
thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We 
have observed some non-linear horizontal scalability (started with one node, 
then three, then four and the execution time reduced slightly). The mahout's 
RowSimilarityJob with maxed out observations completes in 12 minutes for the 
same data set.

"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"

Thanks, guys, for your time reading this and, again, any feedback (especially 
on how to improve the implementation ;) ) is greatly appreciated
reinis

---
The code:

import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}

import scala.annotation.tailrec
import scala.collection.LinearSeq

object RowSimilaritySparkJob {
def main(args: Array[String]) {
    val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
    val sc = new SparkContext(sConf)

    @transient val hConf = HBaseConfiguration.create()
    val fullTableName = "sparsevectortfidfvectors"
    hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)

    val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])

    val documentRows = rdd.map { get =>
      // read-in tfidf from HBase
      val vectorWritable = new VectorWritable()
      val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
      val vector = vectorWritable.get()

      // output tuple (documentId, tfidfVector)
      (bytesToInt(get._1.get()), vector)
    }

    /* Stage 1 : normalize & transpose */
    val termRows = documentRows

       // we do no normalization since we use cosine similarity and seq2sparse 
already generates normalized tfidf values
      // write in what documents the given termid appears in (term-document 
co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
      .flatMap { case(documentId, tfidfVector) =>
        import scala.collection.JavaConversions._
        for( element <- tfidfVector.nonZeroes() ) yield {
          // output multiple tuples (termid, (documentid, tfidf of term in 
document))
          element.index() -> (documentId -> element.get())
        }
      }

      // combine term-document co-occurrence fragment by merging the vectors 
and thus creating
      // full vector (or in our case linear sequence) of documents the termid 
appears in
      // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId 
-> term-tfidf in that document), ...))
      .combineByKey[LinearSeq[(Int, Double)]](
        (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
        (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
        (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) 
=> { combiner ++ combinee // concatenating Seq's from different workers?},
        40 // this is just number of partitions depending on available hardware 
(cpus))

      /* Stage 2 : co-occurrence triangular matrix */
      // write upper half of the document co-occurrence matrix (triangular 
matrix) one matrix per termid.
      // we take vectors (sequences) generated in the previous step and traverse 
vector dimensions (containing tuple documentid -> tfidf)
      // building triangular matrix elements (rowid -> (colid -> row-col 
value)). Here the rowid and colid are documentids from the vector
      // of the term and row-col value is term-tfidf of both documents 
multiplied with each other (in our case cosine similarity).
      // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId 
-> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
      // representing rows of upper triangular matrix, one per termid
      .flatMap { x: (Int, LinearSeq[(Int, Double)]) => 
triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }

      // tuples from the previous step are reduced by key (leftDocumentId, 
rightDocumentId) over all termid matrixes.
      // since most of the documents contain multiple terms, there will be rows 
for given leftdocumentid, rightdocumentid in multiple matrixes
      // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all 
common terms (it is ensured through reducing by key) summing the similarities up
      // this results into sequence representing triangular matrix aggregated 
by (leftDocumentId, rightDocumentId):
      // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), 
((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
      .reduceByKey(_ + _, 40)

      /* Stage 3 : create similarities */
      // symetrify document co-occurrence matrix by generating lower triangular 
matrix from the upper triangular matrix through
      // swapping left document id and right document id (original (leftdocid, 
rightdocid), tfidfsum) is retained aswell ofcourse
      .flatMap { x: ((Int, Int), Double) =>
          LinearSeq(x, ((x._1._2, x._1._1), x._2))
      }

      // save in hadoop
      .saveAsTextFile("row-similarity-test")
}

/**
* Produces LinearSeq representing triangular matrix of document co-occurrences
* @param accumulator is a temporary variable used by recursion (enables 
tail-recursion)
* @param corpus shall contain at least two elements (2 document occurrences), 
otherwise an empty Seq will be returned
* @return Seq of document co-occurrences formated as ((leftDocumentId -> 
rightDocumentId) -> leftTFIDF * rightTFIDF)
*/
@tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, 
Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), 
Double)] = corpus match {
  case h +: Seq() => accumulator
  case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) 
yield ((h._1, e._1), e._2 * h._2)), t)
}
}


Reply via email to