I use a small data and reproduce the problem. But I don't know my codes are correct or not because I am not familiar with spark. So I first post my codes here. If it's correct, then I will post the data. one line of my data like:
{ "time":"08-09-17","cmtUrl":"2094361" ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9 ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]} it's a json file which contains webpageUrl and word_vec which is the encoded words. The first step is to prase the input rdd to a rdd of VectorUrl. BTW, if public VectorUrl call(String s) return null, is it ok? Then follow the example Index documents with unique IDs Then I create a rdd to map id to url so after lda training, I can find the url of the document. Then save this rdd to hdfs. Then create corpus rdd and train The exception stack is 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100) java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100) at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) ==========here is my codes============== SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName()); JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data JavaRDD<String> data = sc.textFile(inputDir + "/*"); JavaRDD<VectorUrl> parsedData = data.map(new Function<String, VectorUrl>() { public VectorUrl call(String s) { JsonParser parser = new JsonParser(); JsonObject jo = parser.parse(s).getAsJsonObject(); if (!jo.has("word_vec") || !jo.has("webpageUrl")) { return null; } JsonArray word_vec = jo.get("word_vec").getAsJsonArray(); String url = jo.get("webpageUrl").getAsString(); double[] values = new double[word_vec.size()]; for (int i = 0; i < values.length; i++) values[i] = word_vec.get(i).getAsInt(); return new VectorUrl(Vectors.dense(values), url); } }); // Index documents with unique IDs JavaPairRDD<Long, VectorUrl> id2doc = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() { public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> doc_id) { return doc_id.swap(); } })); JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, String>>() { @Override public Tuple2<Long, String> call(Tuple2<Long, VectorUrl> id2doc) throws Exception { return new Tuple2(id2doc._1, id2doc._2.url); } })); id2Url.saveAsTextFile(id2UrlPath); JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, Vector>>() { @Override public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl> id2doc) throws Exception { return new Tuple2(id2doc._1, id2doc._2.vec); } })); corpus.cache(); // Cluster the documents into three topics using LDA DistributedLDAModel ldaModel = (DistributedLDAModel) new LDA().setMaxIterations(iterNumber) .setK(topicNumber).run(corpus); On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fancye...@gmail.com> wrote: > I will use a portion of data and try. will the hdfs block affect > spark?(if so, it's hard to reproduce) > > On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jos...@databricks.com> wrote: >> Hi Li, >> >> I'm wondering if you're running into the same bug reported here: >> https://issues.apache.org/jira/browse/SPARK-12488 >> >> I haven't figured out yet what is causing it. Do you have a small corpus >> which reproduces this error, and which you can share on the JIRA? If so, >> that would help a lot in debugging this failure. >> >> Thanks! >> Joseph >> >> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fancye...@gmail.com> wrote: >>> >>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2. >>> it throws exception in line: Matrix topics = ldaModel.topicsMatrix(); >>> But in yarn job history ui, it's successful. What's wrong with it? >>> I submit job with >>> .bin/spark-submit --class Myclass \ >>> --master yarn-client \ >>> --num-executors 2 \ >>> --driver-memory 4g \ >>> --executor-memory 4g \ >>> --executor-cores 1 \ >>> >>> >>> My codes: >>> >>> corpus.cache(); >>> >>> >>> // Cluster the documents into three topics using LDA >>> >>> DistributedLDAModel ldaModel = (DistributedLDAModel) new >>> >>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus); >>> >>> >>> // Output topics. Each is a distribution over words (matching word >>> count vectors) >>> >>> System.out.println("Learned topics (as distributions over vocab of >>> " + ldaModel.vocabSize() >>> >>> + " words):"); >>> >>> //Line81, exception here: Matrix topics = ldaModel.topicsMatrix(); >>> >>> for (int topic = 0; topic < topicNumber; topic++) { >>> >>> System.out.print("Topic " + topic + ":"); >>> >>> for (int word = 0; word < ldaModel.vocabSize(); word++) { >>> >>> System.out.print(" " + topics.apply(word, topic)); >>> >>> } >>> >>> System.out.println(); >>> >>> } >>> >>> >>> ldaModel.save(sc.sc(), modelPath); >>> >>> >>> Exception in thread "main" java.lang.IndexOutOfBoundsException: >>> (1025,0) not in [-58,58) x [-100,100) >>> >>> at >>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) >>> >>> at >>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) >>> >>> at >>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) >>> >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> >>> at >>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> >>> at >>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) >>> >>> at >>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) >>> >>> at >>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81) >>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> >>> at >>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) >>> >>> at >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) >>> >>> at >>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) >>> >>> at >>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) >>> >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown >>> hook >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org