I use a small data and reproduce the problem.
But I don't know my codes are correct or not because I am not familiar
with spark.
So I first post my codes here. If it's correct, then I will post the data.
one line of my data like:

{ "time":"08-09-17","cmtUrl":"2094361"
,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
    
,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}

it's a json file which contains webpageUrl and word_vec which is the
encoded words.
The first step is to prase the input rdd to a rdd of VectorUrl.
BTW, if public VectorUrl call(String s) return null, is it ok?
Then follow the example Index documents with unique IDs
Then I create a rdd to map id to url so after lda training, I can find
the url of the document. Then save this rdd to hdfs.
Then create corpus rdd and train

The exception stack is

15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
exception: java.lang.IndexOutOfBoundsException: (454,0) not in
[-58,58) x [-100,100)
java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100)
at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
at 
org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
at 
org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
at 
org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)


==========here is my codes==============

SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());

    JavaSparkContext sc = new JavaSparkContext(conf);


    // Load and parse the data

    JavaRDD<String> data = sc.textFile(inputDir + "/*");

    JavaRDD<VectorUrl> parsedData = data.map(new Function<String, VectorUrl>() {

      public VectorUrl call(String s) {

        JsonParser parser = new JsonParser();

        JsonObject jo = parser.parse(s).getAsJsonObject();

        if (!jo.has("word_vec") || !jo.has("webpageUrl")) {

          return null;

        }

        JsonArray word_vec = jo.get("word_vec").getAsJsonArray();

        String url = jo.get("webpageUrl").getAsString();

        double[] values = new double[word_vec.size()];

        for (int i = 0; i < values.length; i++)

          values[i] = word_vec.get(i).getAsInt();

        return new VectorUrl(Vectors.dense(values), url);

      }

    });



    // Index documents with unique IDs

    JavaPairRDD<Long, VectorUrl> id2doc =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(

        new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() {

          public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> doc_id) {

            return doc_id.swap();

          }

        }));

    JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc

        .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, String>>() {

          @Override

          public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
id2doc) throws Exception {

            return new Tuple2(id2doc._1, id2doc._2.url);

          }

        }));

    id2Url.saveAsTextFile(id2UrlPath);

    JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc

        .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, Vector>>() {

          @Override

          public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
id2doc) throws Exception {

            return new Tuple2(id2doc._1, id2doc._2.vec);

          }

        }));

    corpus.cache();


    // Cluster the documents into three topics using LDA

    DistributedLDAModel ldaModel = (DistributedLDAModel) new
LDA().setMaxIterations(iterNumber)

        .setK(topicNumber).run(corpus);

On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fancye...@gmail.com> wrote:
> I will use a portion of data and try. will the hdfs block affect
> spark?(if so, it's hard to reproduce)
>
> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jos...@databricks.com> wrote:
>> Hi Li,
>>
>> I'm wondering if you're running into the same bug reported here:
>> https://issues.apache.org/jira/browse/SPARK-12488
>>
>> I haven't figured out yet what is causing it.  Do you have a small corpus
>> which reproduces this error, and which you can share on the JIRA?  If so,
>> that would help a lot in debugging this failure.
>>
>> Thanks!
>> Joseph
>>
>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fancye...@gmail.com> wrote:
>>>
>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
>>> But in yarn job history ui, it's successful. What's wrong with it?
>>> I submit job with
>>> .bin/spark-submit --class Myclass \
>>>     --master yarn-client \
>>>     --num-executors 2 \
>>>     --driver-memory 4g \
>>>     --executor-memory 4g \
>>>     --executor-cores 1 \
>>>
>>>
>>> My codes:
>>>
>>>    corpus.cache();
>>>
>>>
>>>     // Cluster the documents into three topics using LDA
>>>
>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>>
>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>>
>>>
>>>     // Output topics. Each is a distribution over words (matching word
>>> count vectors)
>>>
>>>     System.out.println("Learned topics (as distributions over vocab of
>>> " + ldaModel.vocabSize()
>>>
>>>         + " words):");
>>>
>>>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>>>
>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>>
>>>       System.out.print("Topic " + topic + ":");
>>>
>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>>
>>>         System.out.print(" " + topics.apply(word, topic));
>>>
>>>       }
>>>
>>>       System.out.println();
>>>
>>>     }
>>>
>>>
>>>     ldaModel.save(sc.sc(), modelPath);
>>>
>>>
>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>> (1025,0) not in [-58,58) x [-100,100)
>>>
>>>         at
>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>>
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>
>>>         at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>>
>>>         at
>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>>
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>>
>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
>>> hook
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to