Github user feynmanliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8551#discussion_r39460837
  
    --- Diff: 
examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala ---
    @@ -186,121 +186,52 @@ object LDAExample {
        * Load documents, tokenize them, create vocabulary, and prepare 
documents as term count vectors.
        * @return (corpus, vocabulary as array, total token count in corpus)
        */
    -  private def preprocess(
    +  private def preProcess(
           sc: SparkContext,
           paths: Seq[String],
           vocabSize: Int,
    -      stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
    +      stopWordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
     
         // Get dataset of document texts
         // One document per line in each text file. If the input consists of 
many small files,
         // this can result in a large number of small partitions, which can 
degrade performance.
         // In this case, consider using coalesce() to create fewer, larger 
partitions.
         val textRDD: RDD[String] = sc.textFile(paths.mkString(","))
    -
    -    // Split text into words
    -    val tokenizer = new SimpleTokenizer(sc, stopwordFile)
    -    val tokenized: RDD[(Long, IndexedSeq[String])] = 
textRDD.zipWithIndex().map { case (text, id) =>
    -      id -> tokenizer.getWords(text)
    -    }
    -    tokenized.cache()
    -
    -    // Counts words: RDD[(word, wordCount)]
    -    val wordCounts: RDD[(String, Long)] = tokenized
    -      .flatMap { case (_, tokens) => tokens.map(_ -> 1L) }
    -      .reduceByKey(_ + _)
    -    wordCounts.cache()
    -    val fullVocabSize = wordCounts.count()
    -    // Select vocab
    -    //  (vocab: Map[word -> id], total tokens after selecting vocab)
    -    val (vocab: Map[String, Int], selectedTokenCount: Long) = {
    -      val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || 
fullVocabSize <= vocabSize) {
    -        // Use all terms
    -        wordCounts.collect().sortBy(-_._2)
    -      } else {
    -        // Sort terms to select vocab
    -        wordCounts.sortBy(_._2, ascending = false).take(vocabSize)
    -      }
    -      (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum)
    +    val sqlContext = new SQLContext(sc)
    +    import sqlContext.implicits._
    --- End diff --
    
    nit: I would put this sqlContext instantiation and implicits import right 
at the start of the method (on L194)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to