Hi Frank, Two suggestions
1. I would recommend caching the corpus prior to running LDA 2. If you are using EM I would tweak the sample size using the setMiniBatchFraction parameter to decrease the sample per iteration. -Richard On Tue, Sep 20, 2016 at 10:27 AM, Frank Zhang < dataminin...@yahoo.com.invalid> wrote: > Hi Yuhao, > > Thank you so much for your great contribution to the LDA and other > Spark modules! > > I use both Spark 1.6.2 and 2.0.0. The data I used originally is very > large which has tens of millions of documents. But for test purpose, the > data set I mentioned earlier ("/data/mllib/sample_lda_data.txt") is good > enough. Please change the path to where you install your Spark to point to > the data set and run those lines: > > import org.apache.spark.mllib.clustering.LDA > import org.apache.spark.mllib.linalg.Vectors > > *//please change the path for the data set below:* > *val data = sc.textFile("/data/mllib/sample_lda_data.txt") * > val parsedData = data.map(s => Vectors.dense(s.trim.split(' > ').map(_.toDouble))) > val corpus = parsedData.zipWithIndex.map(_.swap).cache() > val ldaModel = new LDA().setK(3).run(corpus) > > It should work. After that, please run: > val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus) > > When I ran it, at job #90, that iteration took relatively extremely > long then it stopped with exception: > Active Jobs (1) > Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all > stages): Succeeded/Total > 90 fold at LDAOptimizer.scala:226 > <http://10.252.12.37:4040/jobs/job?id=90> 2016/09/20 10:18:30 22 s 0/269 > 0/538 > Completed Jobs (90) > Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all > stages): Succeeded/Total > 89 fold at LDAOptimizer.scala:226 > <http://10.252.12.37:4040/jobs/job?id=89> 2016/09/20 10:18:30 43 ms 4/4 > (262 skipped) > 8/8 (524 skipped) > 88 fold at LDAOptimizer.scala:226 > <http://10.252.12.37:4040/jobs/job?id=88> 2016/09/20 10:18:30 40 ms 4/4 > (259 skipped) > 8/8 (518 skipped) > 87 fold at LDAOptimizer.scala:226 > <http://10.252.12.37:4040/jobs/job?id=87> 2016/09/20 10:18:29 80 ms 4/4 > (256 skipped) > 8/8 (512 skipped) > 86 fold at LDAOptimizer.scala:226 > <http://10.252.12.37:4040/jobs/job?id=86> 2016/09/20 10:18:29 41 ms 4/4 > (253 skipped) > 8/8 (506 skipped) > Part of the error message: > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1450) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1438) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1437) > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1437) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > at scala.Option.foreach(Option.scala:257) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:811) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1659) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1618) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1607) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob( > DAGScheduler.scala:632) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) > at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:151) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.fold(RDD.scala:1040) > at org.apache.spark.mllib.clustering.EMLDAOptimizer. > computeGlobalTopicTotals(LDAOptimizer.scala:226) > at org.apache.spark.mllib.clustering.EMLDAOptimizer. > next(LDAOptimizer.scala:213) > at org.apache.spark.mllib.clustering.EMLDAOptimizer. > next(LDAOptimizer.scala:79) > at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:334) > ... 48 elided > Caused by: java.lang.StackOverflowError > at java.lang.reflect.InvocationTargetException.<init>( > InvocationTargetException.java:72) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject( > ObjectStreamClass.java:1058) > > Thank you so much! > > Frank > > > > ------------------------------ > *From:* "Yang, Yuhao" <yuhao.y...@intel.com> > *To:* Frank Zhang <dataminin...@yahoo.com>; "user@spark.apache.org" < > user@spark.apache.org> > *Sent:* Tuesday, September 20, 2016 9:49 AM > *Subject:* RE: LDA and Maximum Iterations > > Hi Frank, > > Which version of Spark are you using? Also can you share more information > about the exception. > > If it’s not confidential, you can send the data sample to me ( > yuhao.y...@intel.com) and I can try to investigate. > > Regards, > Yuhao > > *From:* Frank Zhang [mailto:dataminin...@yahoo.com.INVALID] > *Sent:* Monday, September 19, 2016 9:20 PM > *To:* user@spark.apache.org > *Subject:* LDA and Maximum Iterations > > Hi all, > > I have a question about parameter setting for LDA model. When I tried > to set a large number like 500 for > setMaxIterations, the program always fails. There is a very > straightforward LDA tutorial using an example data set in the mllib package: > http://stackoverflow.com/questions/36631991/latent- > dirichlet-allocation-lda-algorithm-not-printing-results-in-spark-scala. > The codes are here: > > import org.apache.spark.mllib.clustering.LDA > import org.apache.spark.mllib.linalg.Vectors > // Load and parse the data > val data = sc.textFile("/data/mllib/sample_lda_data.txt") // you might > need to change the path for the data set > val parsedData = data.map(s => Vectors.dense(s.trim.split(' > ').map(_.toDouble))) > // Index documents with unique IDs > val corpus = parsedData.zipWithIndex.map(_.swap).cache() > // Cluster the documents into three topics using LDA > val ldaModel = new LDA().setK(3).run(corpus) > > But if I change the last line to > val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus), the > program fails. > > I greatly appreciate your help! > > Best, > > Frank > > > > > > -- Richard L Garris Solution Architect Databricks, Inc. rich...@databricks.com Mobile: 650.200.0840 databricks.com <http://databricks.com/>