[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh who shall we ping? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh So, I guess, I should just roll the refactoring back, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh, is there a cluster I can use for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @WeichenXu123 @jkbradley said, pings on Git don't work for him... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 ping @WeichenXu123 , @srowen , @hhbyyh Further comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r148517729 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- I've added the test. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r148507781 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +481,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats - .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( -elementWiseSum, elementWiseSum - ) +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = + batch.treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))({ +case (acc, (_, termCounts)) => + val stat = BDM.zeros[Double](k, vocabSize) --- End diff -- Actually, we can fix this w/o falling back to `mapPartition`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r148506477 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +481,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats - .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( -elementWiseSum, elementWiseSum - ) +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = + batch.treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))({ +case (acc, (_, termCounts)) => + val stat = BDM.zeros[Double](k, vocabSize) --- End diff -- Thanks for a good point. Feels like it will definitely lead to higher load on GC. @WeichenXu123, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 Okay... any idea why tests failed? It says ```ERROR: Step ?Publish JUnit test result report? failed: No test report files were found. Configuration error?``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @WeichenXu123, in a case of large dataset this "adjustment" would have infinitesimal effect. (IMO, no adjustment is needed -- the expected number of non-empty docs in the same and does not depend on the order of filter and sample and equals to `docs.size * miniBatchFraction * fractionOfNonEmptyDocs`). So I believe, we all agree that sampling should go before filtering. I'll send a commit shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh OK, but it returns almost the same number of elements. Anyway, the variance is going to be much smaller that in the case with sample before filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh Yes, in this way we don't change semantics of `miniBatchFraction`. But is the way it is defined now actually correct? As I mentioned above, in the `upstram/master` the number of non-empty documents in the mini-batch is asymptotically normally distributed. Hence, the size of RDD fed to `treeAggregate` differs from one batch to another. While in this PR (filtering before sampling) all the batches have the same length. But then again, for large datasets this should make no difference. If nobody thinks this disparity of batch sizes is an issue, I won't object against sampling before filtering. @WeichenXu123, I believe, you were advocating for filter before sample. Do you still have the preference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 Ping @hhbyyh, @WeichenXu123, @srowen. Seems like the discussion is stuck. Does anybody think that the general approach implemented in this PR should be changed? Currently it is filtering before sampling with no caching. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147230366 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- But performance wise... Currently we have a single pass. But if we move the check to `next`, `isEmpty` being an action will trigger the materialization of `batch` and then `batch` will be materialized once again when `treeAggregate` is called in `submitMiniBatch`. Right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147229232 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- So technically, you suggest to move the check ``` if (batchSize == 0) { logWarning("No non-empty documents were submitted in the batch.") // Therefore, there is no need to update any of the model parameters return this } ``` to `next()`? Sounds reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @hhbyyh, in case of "filter before sample" in a local test the overhead is negligible. Regarding "sample before filter", you are right. There (strictly speaking) should be adjustment of `miniBatchFraction`. Which is why I do prefer "filter before sample". Also note, version "sample before filter" is logically equivalent to the current upstream/master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147208156 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this submitMiniBatch(batch) } /** * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA * model, and it will update the topic distribution adaptively for the terms appearing in the * subset. + * The methods assumes no empty documents are submitted. --- End diff -- Thank you. I'll add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147208062 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- The method assumes no empty documents are submitted. But it's perfectly fine to submit an empty RDD. And I need to fix a typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 And the empty docs were not explicitly filtered out. They've just been ignored in `submitMiniBatch`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 I'm saying they are not the same, but for larger datasets this should not matter. There is a change in logic. The hack with `val batchSize = (miniBatchFraction * corpusSize).ceil.toInt` is not used anymore. The function `updateLambda` uses the real number of non-empty docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 Consider the following scenario. Let `docs` be an RDD containing 1000 empty documents and 1000 non-empty documents and let `miniBatchFraction = 0.05`. Assume, we use `filter(...).sample(...)`. Then the resulted RDD will have around `50` elements. If we use `sample(...).filter(...)` instead, the `sample` returns around `100` elements. Now the number of elements in the RDD returned by `filter` is normally distributed. The expectation is `50` again though. Do I miss smth? However, for larger samples this shouldn't make any difference. On the purpose of the issue: there were two different variables `batchSize`, and `nonEmptyDocsN` which could not be used interchangeably. The purpose is to submit a batch containing no empty docs which makes the two variables refer the same value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @WeichenXu123, yes there indeed is a difference in logic. Eventually it boils down to semantics of `miniBatchFraction`. If it is a fraction of non-empty documents being sampled, the version with `filter` going first is correct. If it's a fraction of documents (empty and non-empty) being sampled, then the version with `sample` going first is correct. To me the first version seems more reasonable (who cares about empty docs anyway). @srowen, if I get it right, you would prefer the second option. Why? @WeichenXu123, I agree with you: filtering introduces a minimal overhead. @srowen, regarding performance... I don't actually think it makes any difference unless complexity of `sample` depends on the length of the parent RDD. In all the subsequent computations empty documents can be handled effectively. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 I am sure that caching may by avoided here. Hence, it should not be used. @srowen, maybe I don't get something, but I'm afraid, that currently lineage for a single mini-batch submission looks like this `docs.filter(nonEmpty).sample(minibatchFraction).treeAggregate(...)`. And I'm afraid that for each mini-batch `filter` will be performed all over again. But if we have smth like `docs.sample(minibatchFraction).filter(nonEmpty).treeAggregate(...)`, this will be avoided. And no caching is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147021726 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- I believe, it's redundant now. Anyway, `submitMiniBatch` counts the documents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146882424 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +495,38 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = stats .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( elementWiseSum, elementWiseSum ) expElogbetaBc.destroy(false) -if (nonEmptyDocsN == 0) { +if (batchSize == 0) { logWarning("No non-empty documents were submitted in the batch.") // Therefore, there is no need to update any of the model parameters return this } val batchResult = statsSum *:* expElogbeta.t -// Note that this is an optimization to avoid batch.count -val batchSize = (miniBatchFraction * corpusSize).ceil.toInt updateLambda(batchResult, batchSize) -logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) -logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) this } /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def updateLambda(stat: BDM[Double], batchSize: Int): Unit = { + private def updateLambda(stat: BDM[Double], batchSize: Double): Unit = { --- End diff -- Guess, isn't. If we want this kind of consistency, `batchSize` should rather be `Double` all the way. So I've change the type to `Long`. Thanks for the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 Or (and I think it would be the most efficient approach) we can just stick in the check for emptiness of the document to the `seqOp` of `treeAggregate`. However, it doesn't look like "filtering out beforehand". So, would this be OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 Now I feel that filtering empty docs out in the `initialize` is not a good idea, because it will be performed as many times, as the number of times `sample` in `next` gets called. Right? Alternatively we can cache `this.docs`, but it's a waste of space... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146820166 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +495,38 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = stats .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( elementWiseSum, elementWiseSum ) --- End diff -- It looks better now. Thank you for the suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146812501 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +495,38 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = stats .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( elementWiseSum, elementWiseSum ) --- End diff -- Actually, the block from `mapPartition` will be simplified, since it will no longer need to process collections of documents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146804206 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +495,38 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = stats .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( elementWiseSum, elementWiseSum ) --- End diff -- Yes, that's right. But on the other hand, we've currently got a rather simple `treeAggregate` so it barely contributes to the complexity of the code. Anyway, is it OK to fix this in this issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146572407 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -497,40 +495,38 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { (u._1, u._2, u._3 + v._3) } -val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], batchSize: Long) = stats .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( elementWiseSum, elementWiseSum ) expElogbetaBc.destroy(false) -if (nonEmptyDocsN == 0) { +if (batchSize == 0) { logWarning("No non-empty documents were submitted in the batch.") // Therefore, there is no need to update any of the model parameters return this } val batchResult = statsSum *:* expElogbeta.t -// Note that this is an optimization to avoid batch.count -val batchSize = (miniBatchFraction * corpusSize).ceil.toInt updateLambda(batchResult, batchSize) -logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) -logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) this } /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def updateLambda(stat: BDM[Double], batchSize: Int): Unit = { + private def updateLambda(stat: BDM[Double], batchSize: Double): Unit = { --- End diff -- I've done this just in order to achieve consistency with `updateAlpha`. Is it a bad idea? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r146571987 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -415,7 +415,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { docs: RDD[(Long, Vector)], lda: LDA): OnlineLDAOptimizer = { this.k = lda.getK -this.corpusSize = docs.count() +this.docs = docs.filter(_._2.numNonzeros > 0) // filter out empty documents +this.corpusSize = this.docs.count() this.vocabSize = docs.first()._2.size --- End diff -- `docs` is assumed to be non-empty, while `this.docs` may be empty. In this case `first()` fails. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/19565 @WeichenXu123, @hhbyyh, looking forward to your opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
GitHub user akopich opened a pull request: https://github.com/apache/spark/pull/19565 [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter out empty documents beforehand ## What changes were proposed in this pull request? The empty documents are filtered out in the `initialize` method and are never included in mini-batches. `batchSize`, and `nonEmptyDocsN` are now the same thing. ## How was this patch tested? Existing unit-tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akopich/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19565.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19565 commit 721f235934f26e75172d39f0398365606616267f Author: Valeriy Avanesov Date: 2017-10-24T14:08:39Z [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter out empty documents beforehand ## What changes were proposed in this pull request? The empty documents are filtered out in the `initialize` method and are never included in mini-batches. `batchSize`, and `nonEmptyDocsN` are now the same thing. ## How was this patch tested? Existing unit-tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @jkbradley, no problem. @jkbradley, @WeichenXu123, @hhbyyh, thank you all guys! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 ping @jkbradley. Anyway, tests are passed now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @jkbradley, no problem. The test build seems to be aborted. What's wrong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, no problem! Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, yes sure. But can this wait until this PR is merged? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, could you please notify @jkbradley once again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 So shall we ping @jkbradley, shan't we? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143159334 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- I see now. Thank you. But seems like the style guide suggests to move both of the parameters to the new line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143084875 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. + * Uses Newton-Rhapson method. * @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters * (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf) + * @param logphat Expectation of estimated log-posterior distribution of + *topics in a document averaged over the batch. + * @param nonEmptyDocsN number of non-empty documents */ - private def updateAlpha(gammat: BDM[Double]): Unit = { + private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): Unit = { --- End diff -- The methods will have to cast `nonEmptyDocsN: Int` to `Double`. This way we have the conversion implicitly, but the method is private so I don't think it's going to hurt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143084656 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- Do you mean the extra spaces after `u` and `v`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143069049 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- About `logphatPartOptionBase`: tried that, initially and failed. This was discussed above with @WeichenXu123. The problem is caused by in-place modifications. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143066229 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { --- End diff -- But spark scala style guide says : "... \ return is preferred: Use `return` as a guard to simplify control flow without adding a level of indentation". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143064794 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { + logWarning("No non-empty documents were submitted in the batch.") + // Therefore, there is no need to update any of the model parameters + return this +} + +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) --- End diff -- Great point. Thank you. Moreover, it should be destroyed as soon as there is no need in it anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143060674 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) --- End diff -- Thx. Fxd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143060537 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- If I don't assign logphatPartOptionBase to a local variable, NonSerializableException is generated. Regarding comments. Isn't it necessary to emphasise that the computation happens in the same pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 I have conducted some performance testing with random data. The new implementation turns out to be notably faster. ``` OLD with hyper-parameter optimization : 237 sec OLD w/o hyper-parameter optimization : 226 sec NEW with hyper-parameter optimization : 178 sec NEW w/o hyper-parameter optimization : 171 sec ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143003890 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) --- End diff -- Thanks for the comments, @jkbradley and @hhbyyh. The check is added. I have also added a generation of warning message in case of an "empty" batch. I believe, a user should know that a thing like that happened. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 Thank you, @hhbyyh. I have augmented the example a bit: explicitly set random seed a nd chosen online optimizer: `val lda = new LDA().setK(10).setMaxIter(10).setOptimizer("online").setSeed(13)` But for some reason if I run it twice, the results are not the same. Is that expected? branch-2.2 was used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @jkbradley, thank you! - Correctness: in order to test the equivalence of two versions of `submitMiniBatch` I have to bring both of them into the scope... One solution would be to derive a class `OldOnlineLDAOptimizer` from `OnlineLDAOptimizer` and override `submitMiniBatch` but the class is final. What's the preferred approach? - Sure. Sounds good. Should I add test-case reporting the CPU time or should I rather define an `App`? Should I add the code to the PR or just report the results here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 BTW. Seems like `updateLambda` method relies (in older version as well) on `batchSize` only because this is `an optimization to avoid batch.count`. Shouldn't we rather use `nonEmptyDocsN` instead since we compute it efficietly now? But that is going to change logic... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @hhbyyh, this change does not target performance but scalability, and I am afraid, the change is beneficial only for huge datasets and the tests would require massive computational resources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123. thank you --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142632240 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- Thanks for the good point. Do I understand correctly that if a batch without any non-empty docs is submitted, the `submitMiniBatch` method shouldn't change the state of `LDAOptimizer`? cc @WeichenXu123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142625490 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) + this } /** - * Update lambda based on the batch submitted. batchSize can be different for each iteration. + * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration. --- End diff -- Thanks. Comment reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624984 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- I believe, this will be settled down SPARK-22111. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624246 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624340 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624093 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142622117 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142620788 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration --- End diff -- This line is necessary in order to avoid serialization of `LDASuite` which is not serializable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, the PR seems to receive no attention for 10 days now... What should I do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, @jkbradley, talking of merging. Is there anything else I should improve in this PR in order for it to be mergeable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, thanks for creating Jira. Yes, sure I will work on it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @jkbradley, thanks for the comments. Who is supposed to create the followup jira? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140630215 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs => --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140199136 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Please, check out the updated PR. I have added `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}`. Unfortunately, we cannot have the aggregation operation in a purely in-place manner now since `Int` is immutable. Shouldn't be a big deal since matrices and vectors are still updated in place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140193380 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- But should we have `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}` stuff? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140183412 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Or another suggestion. Lets, have smth like `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}` where the `Int` stands for the number of non-empty elements in a partition. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140180799 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- @WeichenXu123, you are right. So should we add `stats.count()` or should we rather embed the counting in the aggregation phase so that we avoid the second pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @jkbradley, thank you for your comments! Please, check out the commit adding the necessary docs. Regarding tests: I believe, `OnlineLDAOptimizer alpha hyperparameter optimization` from `mllib/clustering/LDASuite.scala` covers the piece of code being rewritten. Or should there be tests of a different kind? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140032198 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- I also rename `N` to `batchSize` which it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140031900 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) --- End diff -- OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @WeichenXu123, thank you for your prompt reply! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139514402 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase nonEmptyDocs.foreach { case (_, termCounts: Vector) => val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption)) +} + +val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]), + v : (BDM[Double], Option[BDV[Double]])) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + u +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))( +elementWiseSumInPlace, elementWiseSumInPlace + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) + +expElogbetaBc.destroy(false) +stats.unpersist() --- End diff -- Do you mean `stats.unpersist()`? Sure, I got rid of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139514301 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase --- End diff -- Great point. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 Ping @jkbradley . Thank you @WeichenXu123 one again for the comment! Please, have a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 Yes, sure. Thank you for the valuable comment. Hopefully, I'll update the code this week. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 @feynmanliang , @hhbyyh, @WeichenXu123, could you please review the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
GitHub user akopich opened a pull request: https://github.com/apache/spark/pull/18924 [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver Hi, as it was proposed by Joseph K. Bradley, gammat are not collected to the driver anymore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akopich/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18924 commit f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc Author: Valeriy Avanesov Date: 2017-08-11T16:28:38Z [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver gammat are not collected to a local matrix but rather represented as RDD[BDV[Double]] and are aggregated in a distributed manner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich closed the pull request at: https://github.com/apache/spark/pull/1269 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-78184948 @renchengchang What do you mean by "topic vector"? A vector of p(t|d) \forall t? If so, you can find these vectors in `RDD[DocumentParameters]` which is returned by `infer(documents: RDD[Document], ...)` method. `DocumentParameters` stores a document a vector of p(t|d) \forall t which is referred as `theta`. BTW, the order of documents is remained the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-78050367 @renchengchang 1. Hi. 2. Don't use code from this PR. Use either LDA (which is merged with mllib) or https://github.com/akopich/dplsa which is a further development of this PR. 3. I do not employ the concept of document id. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1405] [mllib] Latent Dirichlet Allocati...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/4047#discussion_r23501548 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, sum => brzSum, normalize, axpy => brzAxpy} + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.graphx._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + + +/** + * :: DeveloperApi :: + * + * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + * + * Terminology: + * - "word" = "term": an element of the vocabulary + * - "token": instance of a term appearing in a document + * - "topic": multinomial distribution over words representing some concept + * + * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented + * according to the Asuncion et al. (2009) paper referenced below. + * + * References: + * - Original LDA paper (journal version): + *Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + * - This class implements their "smoothed" LDA model. + * - Paper which clearly explains several algorithms, including EM: + *Asuncion, Welling, Smyth, and Teh. + *"On Smoothing and Inference for Topic Models." UAI, 2009. + * + * NOTE: This is currently marked DeveloperApi since it is under active development and may undergo + * API changes. + */ +@DeveloperApi +class LDA private ( +private var k: Int, +private var maxIterations: Int, --- End diff -- http://link.springer.com/chapter/10.1007%2F978-3-319-12580-0_3 Theorem 1. It's formulated for an arbitrary regularization. Dirichlet prior is just a special case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1405] [mllib] Latent Dirichlet Allocati...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/4047#discussion_r23501440 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, sum => brzSum, normalize, axpy => brzAxpy} + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.graphx._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + + +/** + * :: DeveloperApi :: + * + * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + * + * Terminology: + * - "word" = "term": an element of the vocabulary + * - "token": instance of a term appearing in a document + * - "topic": multinomial distribution over words representing some concept + * + * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented + * according to the Asuncion et al. (2009) paper referenced below. + * + * References: + * - Original LDA paper (journal version): + *Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + * - This class implements their "smoothed" LDA model. + * - Paper which clearly explains several algorithms, including EM: + *Asuncion, Welling, Smyth, and Teh. + *"On Smoothing and Inference for Topic Models." UAI, 2009. + * + * NOTE: This is currently marked DeveloperApi since it is under active development and may undergo + * API changes. + */ +@DeveloperApi +class LDA private ( +private var k: Int, +private var maxIterations: Int, --- End diff -- @jkbradley But why do you call it smoothing? If 0 < alpha < 1, Dirichlet prior does not smooth but sparsify. By the way, correctness of EM algorithm for 0 < alpha < 1 was proven by Vorontsov (2014) -- it just involves a positive cut before normalizing counters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-69560296 @jkbradley, @mengxr, please, include @IlyaKozlov as author too. He's helped a lot with the implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67664969 By the way. May be it's off top, but this is related to initial approximation generation. Suppose, one has `indxs : RDD[Int]` and is about to create an RDD of random ints `rnd : RDD[Int]` s.t. `rnd.count == indxs.count`. The obvious way leads to smth as follows ``` val random = new java.util.Random() val rnd = indx.map(random.nextInt) ``` But `random` object is going to be serialized and workers will recieve `random` with the same seed, thus this solution produces sequences of the same ints. What's a proper way? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67661902 I've fixed perplexity for robust plsa and updates perplexity value in the comment above. Now they are almost the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67656496 And tests fail again in obscure manner... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67643630 I've performed sanity check on the dataset i've described above. PLSA: tm project obtains perplexity of `2358` and this implementation ends up with `2311`. RobustPLSA: `1871` and `1466` respectively. Seems to be sane. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67493934 How do you compare accuracy? Perplexity means nothing but perplexity -- topic models may be reliably compared only via application task (e.g. classification, recommendation... ). Should I add the dataset for "perplexity sanity check" to the repo? I am about to use 1000 arxiv papers. This dataset is about 20 MB (5.5 MB zipped). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich closed the pull request at: https://github.com/apache/spark/pull/1269 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
GitHub user akopich reopened a pull request: https://github.com/apache/spark/pull/1269 [SPARK-2199] [mllib] topic modeling I have implemented Probabilistic Latent Semantic Analysis (PLSA) and Robust PLSA with support of additive regularization (that actually means that I've implemented Latent Dirichlet Allocation too). You can merge this pull request into a Git repository by running: $ git pull https://github.com/akopich/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1269 commit 4ac42d1dad85593b1f05c02b2a2b48080abaaa05 Author: valerij Date: 2014-12-11T11:44:39Z [SPARK-2199][mllib] recommit all the changes due to wierd rebase commit 7e779929ad206c871e830c015f934c4b8824d9d5 Author: valerij Date: 2014-12-11T12:04:58Z [SPARK-2199][mllib] rareTokenThreshold is set via setter commit e5f4a7b54d0cf7e73c0f567084439216a34fe9bd Author: valerij Date: 2014-12-11T12:16:27Z [SPARK-2199][mllib] Dirichlet pdf computation moved to mllib/stats commit 8e953e7d378fe012b2c3364b9cc570cd1af57f0e Author: valerij Date: 2014-12-11T12:23:01Z [SPARK-2199][mllib] long line wrapped commit c54afc96bb493143d9ce0484118a452ad8c7514d Author: valerij Date: 2014-12-11T13:34:37Z [SPARK-2199][mllib] setter and tests are fixed commit 0764aaa9e8737c824ad0a71ec6ecb197476e2419 Author: valerij Date: 2014-12-11T14:50:15Z [SPARK-2199][mllib] DirichletDistribution is serializable now commit b6f852e2124b5be0bede251050981284221b51f3 Author: valerij Date: 2014-12-18T00:29:37Z [SPARK-2199][mllib] colt dependency removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67415235 What do you mean by scaling tests? Tests measuring the dependence of computation time on numer of machines? Are there scaling tests for GraphX LDA implementations? Or should I conduct scaling tests for these implementations as well as my implementation on the same cluster? And what should we do with these test failrues? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67410274 ``` - filter pushdown - boolean *** FAILED *** (249 milliseconds)``` I have no idea why this could happen. Should I rebase again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2199] [mllib] topic modeling
Github user akopich commented on the pull request: https://github.com/apache/spark/pull/1269#issuecomment-67399691 @jkbradley Thank you for explanation about setters. tm implementation was tested (it was succesfully used in one of my project) but it was tested with scala 2.11, not scala 2.10. Is it necessary to implement this test as a part of spark project? I suggest the following: I just write an example running PLSA training (distributed one) on a particular dataset and add it to this patch. Also I write an example running PLSA training reliying on tm project and add it to tm project. One can ran both of these examples and ensure that perplexity values look similar. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org