Repository: spark Updated Branches: refs/heads/master a20e743fb -> d8cfd531c
[SPARK-5567] [MLLIB] Add predict method to LocalLDAModel jkbradley hhbyyh Adds `topicDistributions` to LocalLDAModel. Please review after #7757 is merged. Author: Feynman Liang <fli...@databricks.com> Closes #7760 from feynmanliang/SPARK-5567-predict-in-LDA and squashes the following commits: 0ad1134 [Feynman Liang] Remove println 27b3877 [Feynman Liang] Code review fixes 6bfb87c [Feynman Liang] Remove extra newline 476f788 [Feynman Liang] Fix checks and doc for variationalInference 061780c [Feynman Liang] Code review cleanup 3be2947 [Feynman Liang] Rename topicDistribution -> topicDistributions 2a821a6 [Feynman Liang] Add predict methods to LocalLDAModel Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8cfd531 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8cfd531 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8cfd531 Branch: refs/heads/master Commit: d8cfd531c7c50c9b00ab546be458f44f84c386ae Parents: a20e743 Author: Feynman Liang <fli...@databricks.com> Authored: Thu Jul 30 13:17:54 2015 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Thu Jul 30 13:17:54 2015 -0700 ---------------------------------------------------------------------- .../spark/mllib/clustering/LDAModel.scala | 42 +++++++++++-- .../spark/mllib/clustering/LDAOptimizer.scala | 5 +- .../spark/mllib/clustering/LDASuite.scala | 63 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index ece2884..6cfad3f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -186,7 +186,6 @@ abstract class LDAModel private[clustering] extends Saveable { * This model stores only the inferred topics. * It may be used for computing topics for new documents, but it may give less accurate answers * than the [[DistributedLDAModel]]. - * * @param topics Inferred topics (vocabSize x k matrix). */ @Experimental @@ -221,9 +220,6 @@ class LocalLDAModel private[clustering] ( // TODO // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ??? - // TODO: - // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? - /** * Calculate the log variational bound on perplexity. See Equation (16) in original Online * LDA paper. @@ -269,7 +265,7 @@ class LocalLDAModel private[clustering] ( // by topic (columns of lambda) val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t - var score = documents.filter(_._2.numActives > 0).map { case (id: Long, termCounts: Vector) => + var score = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) => var docScore = 0.0D val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, exp(Elogbeta), brzAlpha, gammaShape, k) @@ -277,7 +273,7 @@ class LocalLDAModel private[clustering] ( // E[log p(doc | theta, beta)] termCounts.foreachActive { case (idx, count) => - docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t) + docScore += count * LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t) } // E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector docScore += sum((brzAlpha - gammad) :* Elogthetad) @@ -297,6 +293,40 @@ class LocalLDAModel private[clustering] ( score } + /** + * Predicts the topic mixture distribution for each document (often called "theta" in the + * literature). Returns a vector of zeros for an empty document. + * + * This uses a variational approximation following Hoffman et al. (2010), where the approximate + * distribution is called "gamma." Technically, this method returns this approximation "gamma" + * for each document. + * @param documents documents to predict topic mixture distributions for + * @return An RDD of (document ID, topic mixture distribution for document) + */ + // TODO: declare in LDAModel and override once implemented in DistributedLDAModel + def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = { + // Double transpose because dirichletExpectation normalizes by row and we need to normalize + // by topic (columns of lambda) + val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t) + val docConcentrationBrz = this.docConcentration.toBreeze + val gammaShape = this.gammaShape + val k = this.k + + documents.map { case (id: Long, termCounts: Vector) => + if (termCounts.numNonzeros == 0) { + (id, Vectors.zeros(k)) + } else { + val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference( + termCounts, + expElogbeta, + docConcentrationBrz, + gammaShape, + k) + (id, Vectors.dense(normalize(gamma, 1.0).toArray)) + } + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 4b90fbd..9dbec41 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -394,7 +394,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val gammaShape = this.gammaShape val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => - val nonEmptyDocs = docs.filter(_._2.numActives > 0) + val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) var gammaPart = List[BDV[Double]]() @@ -461,7 +461,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer { private[clustering] object OnlineLDAOptimizer { /** * Uses variational inference to infer the topic distribution `gammad` given the term counts - * for a document. `termCounts` must be non-empty, otherwise Breeze will throw a BLAS error. + * for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will + * throw a BLAS error. * * An optimization (Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001) * avoids explicit computation of variational parameter `phi`. http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 61d2edf..d74482d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -242,6 +242,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { val alpha = 0.01 val eta = 0.01 val gammaShape = 100 + // obtained from LDA model trained in gensim, see below val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array( 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597, 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124)) @@ -281,6 +282,68 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D) } + test("LocalLDAModel predict") { + val k = 2 + val vocabSize = 6 + val alpha = 0.01 + val eta = 0.01 + val gammaShape = 100 + // obtained from LDA model trained in gensim, see below + val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array( + 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597, + 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124)) + + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + val docs = sc.parallelize(toydata) + + val ldaModel: LocalLDAModel = new LocalLDAModel( + topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape) + + /* Verify results using gensim: + import numpy as np + from gensim import models + corpus = [ + [(0, 1.0), (1, 1.0)], + [(1, 1.0), (2, 1.0)], + [(0, 1.0), (2, 1.0)], + [(3, 1.0), (4, 1.0)], + [(3, 1.0), (5, 1.0)], + [(4, 1.0), (5, 1.0)]] + np.random.seed(2345) + lda = models.ldamodel.LdaModel( + corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100, + decay=0.51, offset=1024) + print(list(lda.get_document_topics(corpus))) + > [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)], + > [(0, 0.99504950495049516)], [(1, 0.99504950495049516)], + > [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]] + */ + + val expectedPredictions = List( + (0, 0.99504), (0, 0.99504), + (0, 0.99504), (1, 0.99504), + (1, 0.99504), (1, 0.99504)) + + val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) => + // convert results to expectedPredictions format, which only has highest probability topic + val topicsBz = topics.toBreeze.toDenseVector + (id, (argmax(topicsBz), max(topicsBz))) + }.sortByKey() + .values + .collect() + + expectedPredictions.zip(actualPredictions).forall { case (expected, actual) => + expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D) + } + } + test("OnlineLDAOptimizer with asymmetric prior") { def toydata: Array[(Long, Vector)] = Array( Vectors.sparse(6, Array(0, 1), Array(1, 1)), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org