I will try spark 1.6.0 to see it is the bug of 1.5.2. On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fancye...@gmail.com> wrote: > I have set up a stand alone spark cluster and use the same codes. it > still failed with the same exception > I also preprocessed the data to lines of integers and use the scala > codes of lda example. it still failed. > the codes: > > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel } > > import org.apache.spark.mllib.linalg.Vectors > > import org.apache.spark.SparkContext > > import org.apache.spark.SparkContext._ > > import org.apache.spark.SparkConf > > > object TestLDA { > > def main(args: Array[String]) { > > if(args.length!=4){ > > println("need 4 args inDir outDir topic iternum") > > System.exit(-1) > > } > > val conf = new SparkConf().setAppName("TestLDA") > > val sc = new SparkContext(conf) > > // Load and parse the data > > val data = sc.textFile(args(0)) > > val parsedData = data.map(s => Vectors.dense(s.trim.split(' > ').map(_.toDouble))) > > // Index documents with unique IDs > > val corpus = parsedData.zipWithIndex.map(_.swap).cache() > > val topicNum=Integer.valueOf(args(2)) > > val iterNum=Integer.valueOf(args(1)) > > // Cluster the documents into three topics using LDA > > val ldaModel = new > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus) > > > // Output topics. Each is a distribution over words (matching word > count vectors) > > println("Learned topics (as distributions over vocab of " + > ldaModel.vocabSize + " words):") > > val topics = ldaModel.topicsMatrix > > for (topic <- Range(0, topicNum)) { > > print("Topic " + topic + ":") > > for (word <- Range(0, ldaModel.vocabSize)) { print(" " + > topics(word, topic)); } > > println() > > } > > > // Save and load model. > > ldaModel.save(sc, args(1)) > > } > > > } > > scripts to submit: > > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class > com.mobvoi.knowledgegraph.scala_test.TestLDA \ > > --master spark://master:7077 \ > > --num-executors 10 \ > > --executor-memory 4g \ > > --executor-cores 3 \ > > scala_test-1.0-jar-with-dependencies.jar \ > > /test.txt \ > > 100 \ > > 5 \ > > /lda_model > > test.txt is in attachment > > > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cutl...@gmail.com> wrote: >> Hi Li, >> >> I tried out your code and sample data in both local mode and Spark >> Standalone and it ran correctly with output that looks good. Sorry, I don't >> have a YARN cluster setup right now, so maybe the error you are seeing is >> specific to that. Btw, I am running the latest Spark code from the master >> branch. Hope that helps some! >> >> Bryan >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fancye...@gmail.com> wrote: >>> >>> anyone could help? the problem is very easy to reproduce. What's wrong? >>> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fancye...@gmail.com> wrote: >>> > I use a small data and reproduce the problem. >>> > But I don't know my codes are correct or not because I am not familiar >>> > with spark. >>> > So I first post my codes here. If it's correct, then I will post the >>> > data. >>> > one line of my data like: >>> > >>> > { "time":"08-09-17","cmtUrl":"2094361" >>> > >>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9 >>> > >>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]} >>> > >>> > it's a json file which contains webpageUrl and word_vec which is the >>> > encoded words. >>> > The first step is to prase the input rdd to a rdd of VectorUrl. >>> > BTW, if public VectorUrl call(String s) return null, is it ok? >>> > Then follow the example Index documents with unique IDs >>> > Then I create a rdd to map id to url so after lda training, I can find >>> > the url of the document. Then save this rdd to hdfs. >>> > Then create corpus rdd and train >>> > >>> > The exception stack is >>> > >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in >>> > [-58,58) x [-100,100) >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x >>> > [-100,100) >>> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) >>> > at >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) >>> > at >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) >>> > at >>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> > at >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) >>> > at >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) >>> > at >>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> > at >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> > at >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> > at java.lang.reflect.Method.invoke(Method.java:606) >>> > at >>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) >>> > >>> > >>> > ==========here is my codes============== >>> > >>> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName()); >>> > >>> > JavaSparkContext sc = new JavaSparkContext(conf); >>> > >>> > >>> > // Load and parse the data >>> > >>> > JavaRDD<String> data = sc.textFile(inputDir + "/*"); >>> > >>> > JavaRDD<VectorUrl> parsedData = data.map(new Function<String, >>> > VectorUrl>() { >>> > >>> > public VectorUrl call(String s) { >>> > >>> > JsonParser parser = new JsonParser(); >>> > >>> > JsonObject jo = parser.parse(s).getAsJsonObject(); >>> > >>> > if (!jo.has("word_vec") || !jo.has("webpageUrl")) { >>> > >>> > return null; >>> > >>> > } >>> > >>> > JsonArray word_vec = jo.get("word_vec").getAsJsonArray(); >>> > >>> > String url = jo.get("webpageUrl").getAsString(); >>> > >>> > double[] values = new double[word_vec.size()]; >>> > >>> > for (int i = 0; i < values.length; i++) >>> > >>> > values[i] = word_vec.get(i).getAsInt(); >>> > >>> > return new VectorUrl(Vectors.dense(values), url); >>> > >>> > } >>> > >>> > }); >>> > >>> > >>> > >>> > // Index documents with unique IDs >>> > >>> > JavaPairRDD<Long, VectorUrl> id2doc = >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( >>> > >>> > new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() >>> > { >>> > >>> > public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> >>> > doc_id) { >>> > >>> > return doc_id.swap(); >>> > >>> > } >>> > >>> > })); >>> > >>> > JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc >>> > >>> > .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, >>> > String>>() { >>> > >>> > @Override >>> > >>> > public Tuple2<Long, String> call(Tuple2<Long, VectorUrl> >>> > id2doc) throws Exception { >>> > >>> > return new Tuple2(id2doc._1, id2doc._2.url); >>> > >>> > } >>> > >>> > })); >>> > >>> > id2Url.saveAsTextFile(id2UrlPath); >>> > >>> > JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc >>> > >>> > .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, >>> > Vector>>() { >>> > >>> > @Override >>> > >>> > public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl> >>> > id2doc) throws Exception { >>> > >>> > return new Tuple2(id2doc._1, id2doc._2.vec); >>> > >>> > } >>> > >>> > })); >>> > >>> > corpus.cache(); >>> > >>> > >>> > // Cluster the documents into three topics using LDA >>> > >>> > DistributedLDAModel ldaModel = (DistributedLDAModel) new >>> > LDA().setMaxIterations(iterNumber) >>> > >>> > .setK(topicNumber).run(corpus); >>> > >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fancye...@gmail.com> wrote: >>> >> I will use a portion of data and try. will the hdfs block affect >>> >> spark?(if so, it's hard to reproduce) >>> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jos...@databricks.com> >>> >> wrote: >>> >>> Hi Li, >>> >>> >>> >>> I'm wondering if you're running into the same bug reported here: >>> >>> https://issues.apache.org/jira/browse/SPARK-12488 >>> >>> >>> >>> I haven't figured out yet what is causing it. Do you have a small >>> >>> corpus >>> >>> which reproduces this error, and which you can share on the JIRA? If >>> >>> so, >>> >>> that would help a lot in debugging this failure. >>> >>> >>> >>> Thanks! >>> >>> Joseph >>> >>> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fancye...@gmail.com> wrote: >>> >>>> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2. >>> >>>> it throws exception in line: Matrix topics = >>> >>>> ldaModel.topicsMatrix(); >>> >>>> But in yarn job history ui, it's successful. What's wrong with it? >>> >>>> I submit job with >>> >>>> .bin/spark-submit --class Myclass \ >>> >>>> --master yarn-client \ >>> >>>> --num-executors 2 \ >>> >>>> --driver-memory 4g \ >>> >>>> --executor-memory 4g \ >>> >>>> --executor-cores 1 \ >>> >>>> >>> >>>> >>> >>>> My codes: >>> >>>> >>> >>>> corpus.cache(); >>> >>>> >>> >>>> >>> >>>> // Cluster the documents into three topics using LDA >>> >>>> >>> >>>> DistributedLDAModel ldaModel = (DistributedLDAModel) new >>> >>>> >>> >>>> >>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus); >>> >>>> >>> >>>> >>> >>>> // Output topics. Each is a distribution over words (matching >>> >>>> word >>> >>>> count vectors) >>> >>>> >>> >>>> System.out.println("Learned topics (as distributions over vocab >>> >>>> of >>> >>>> " + ldaModel.vocabSize() >>> >>>> >>> >>>> + " words):"); >>> >>>> >>> >>>> //Line81, exception here: Matrix topics = >>> >>>> ldaModel.topicsMatrix(); >>> >>>> >>> >>>> for (int topic = 0; topic < topicNumber; topic++) { >>> >>>> >>> >>>> System.out.print("Topic " + topic + ":"); >>> >>>> >>> >>>> for (int word = 0; word < ldaModel.vocabSize(); word++) { >>> >>>> >>> >>>> System.out.print(" " + topics.apply(word, topic)); >>> >>>> >>> >>>> } >>> >>>> >>> >>>> System.out.println(); >>> >>>> >>> >>>> } >>> >>>> >>> >>>> >>> >>>> ldaModel.save(sc.sc(), modelPath); >>> >>>> >>> >>>> >>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException: >>> >>>> (1025,0) not in [-58,58) x [-100,100) >>> >>>> >>> >>>> at >>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> >>>> >>> >>>> at >>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81) >>> >>>> >>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>> >>>> Method) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>>> >>> >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) >>> >>>> >>> >>>> at >>> >>>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) >>> >>>> >>> >>>> at >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) >>> >>>> >>> >>>> at >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) >>> >>>> >>> >>>> at >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>>> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from >>> >>>> shutdown >>> >>>> hook >>> >>>> >>> >>>> --------------------------------------------------------------------- >>> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >>>> >>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >>
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org