Repository: spark Updated Branches: refs/heads/master 1821fc165 -> 6718c1eb6
[SPARK-5562] [MLLIB] LDA should handle empty document. See the jira https://issues.apache.org/jira/browse/SPARK-5562 Author: Alok Singh <singhal@Aloks-MacBook-Pro.local> Author: Alok Singh <sing...@aloks-mbp.usca.ibm.com> Author: Alok Singh <âsing...@us.ibm.comâ> Closes #7064 from aloknsingh/aloknsingh_SPARK-5562 and squashes the following commits: 259a0a7 [Alok Singh] change as per the comments by @jkbradley be48491 [Alok Singh] [SPARK-5562][MLlib] re-order import in alphabhetical order c01311b [Alok Singh] [SPARK-5562][MLlib] fix the newline typo b271c8a [Alok Singh] [SPARK-5562][Mllib] As per github discussion with jkbradley. We would like to simply things. 7c06251 [Alok Singh] [SPARK-5562][MLlib] modified the JavaLDASuite for test passing c710cb6 [Alok Singh] fix the scala code style to have space after : 2572a08 [Alok Singh] [SPARK-5562][MLlib] change the import xyz._ to the import xyz.{c1, c2} .. ab55fbf [Alok Singh] [SPARK-5562][MLlib] Change as per Sean Owen's comments https://github.com/apache/spark/pull/7064/files#diff-9236d23975e6f5a5608ffc81dfd79146 9f4f9ea [Alok Singh] [SPARK-5562][MLlib] LDA should handle empty document. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6718c1eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6718c1eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6718c1eb Branch: refs/heads/master Commit: 6718c1eb671faaf5c1d865ad5d01dbf78dae9cd2 Parents: 1821fc1 Author: Alok Singh <singhal@Aloks-MacBook-Pro.local> Authored: Mon Jul 6 21:53:55 2015 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Mon Jul 6 21:53:55 2015 -0700 ---------------------------------------------------------------------- docs/mllib-clustering.md | 2 +- .../apache/spark/mllib/clustering/JavaLDASuite.java | 13 +++++++++++-- .../org/apache/spark/mllib/clustering/LDASuite.scala | 13 +++++++++++-- 3 files changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6718c1eb/docs/mllib-clustering.md ---------------------------------------------------------------------- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 3aad414..d72dc20 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -447,7 +447,7 @@ It supports different inference algorithms via `setOptimizer` function. EMLDAOpt on the likelihood function and yields comprehensive results, while OnlineLDAOptimizer uses iterative mini-batch sampling for [online variational inference](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) and is generally memory friendly. After fitting on the documents, LDA provides: * Topics: Inferred topics, each of which is a probability distribution over terms (words). -* Topic distributions for documents: For each document in the training set, LDA gives a probability distribution over topics. (EM only) +* Topic distributions for documents: For each non empty document in the training set, LDA gives a probability distribution over topics. (EM only). Note that for empty documents, we don't create the topic distributions. (EM only) LDA takes the following parameters: http://git-wip-us.apache.org/repos/asf/spark/blob/6718c1eb/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 581c033..b48f190 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -28,12 +28,13 @@ import static org.junit.Assert.assertArrayEquals; import org.junit.Before; import org.junit.Test; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; - +import org.apache.spark.mllib.linalg.Vectors; public class JavaLDASuite implements Serializable { private transient JavaSparkContext sc; @@ -110,7 +111,15 @@ public class JavaLDASuite implements Serializable { // Check: topic distributions JavaPairRDD<Long, Vector> topicDistributions = model.javaTopicDistributions(); - assertEquals(topicDistributions.count(), corpus.count()); + // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs + // over topics. Compare it against nonEmptyCorpus instead of corpus + JavaPairRDD<Long, Vector> nonEmptyCorpus = corpus.filter( + new Function<Tuple2<Long, Vector>, Boolean>() { + public Boolean call(Tuple2<Long, Vector> tuple2) { + return Vectors.norm(tuple2._2(), 1.0) != 0.0; + } + }); + assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); } @Test http://git-wip-us.apache.org/repos/asf/spark/blob/6718c1eb/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 406affa..03a8a25 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -99,9 +99,13 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { // Check: per-doc topic distributions val topicDistributions = model.topicDistributions.collect() + // Ensure all documents are covered. - assert(topicDistributions.length === tinyCorpus.length) - assert(tinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) + // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs + // over topics. Compare it against nonEmptyTinyCorpus instead of tinyCorpus + val nonEmptyTinyCorpus = getNonEmptyDoc(tinyCorpus) + assert(topicDistributions.length === nonEmptyTinyCorpus.length) + assert(nonEmptyTinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) // Ensure we have proper distributions topicDistributions.foreach { case (docId, topicDistribution) => assert(topicDistribution.size === tinyK) @@ -232,12 +236,17 @@ private[clustering] object LDASuite { } def tinyCorpus: Array[(Long, Vector)] = Array( + Vectors.dense(0, 0, 0, 0, 0), // empty doc Vectors.dense(1, 3, 0, 2, 8), Vectors.dense(0, 2, 1, 0, 4), Vectors.dense(2, 3, 12, 3, 1), + Vectors.dense(0, 0, 0, 0, 0), // empty doc Vectors.dense(0, 3, 1, 9, 8), Vectors.dense(1, 1, 4, 2, 6) ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } assert(tinyCorpus.forall(_._2.size == tinyVocabSize)) // sanity check for test data + def getNonEmptyDoc(corpus: Array[(Long, Vector)]): Array[(Long, Vector)] = corpus.filter { + case (_, wc: Vector) => Vectors.norm(wc, p = 1.0) != 0.0 + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org