[ 
https://issues.apache.org/jira/browse/SPARK-29817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-29817.
----------------------------------
    Resolution: Duplicate

We should not be making 10+ JIRAs here. The changes in question are small and 
closely related and probably need to be considered together, or else we may 
resolve them differently by different people at different times. I'm going to 
close them in favor of the parent. We can consider breaking out some of them 
later.

> Missing persist on docs in mllib.clustering.LDAOptimizer.initialize
> -------------------------------------------------------------------
>
>                 Key: SPARK-29817
>                 URL: https://issues.apache.org/jira/browse/SPARK-29817
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>    Affects Versions: 2.4.3
>            Reporter: Dong Wang
>            Priority: Major
>
> The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: 
> verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
> {code:scala}
>   override private[clustering] def initialize(
>       docs: RDD[(Long, Vector)],
>       lda: LDA): EMLDAOptimizer = {
>       ...
>     val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, 
> termCounts: Vector) =>
>       // Add edges for terms with non-zero counts.
>       termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case 
> (term, cnt) =>
>         Edge(docID, term2index(term), cnt)
>       }
>     }
>     // Create vertices.
>     // Initially, we use random soft assignments of tokens to topics (random 
> gamma).
>     val docTermVertices: RDD[(VertexId, TopicCounts)] = {
>       val verticesTMP: RDD[(VertexId, TopicCounts)] =
>         edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
>           val random = new Random(partIndex + randomSeed)
>           partEdges.flatMap { edge =>
>             val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 
> 1.0)
>             val sum = gamma * edge.attr
>             Seq((edge.srcId, sum), (edge.dstId, sum))
>           }
>         }
>       verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - 
> docs. First use docs
>     }
>     // Partition such that edges are grouped by document
>     this.graph = Graph(docTermVertices, 
> edges).partitionBy(PartitionStrategy.EdgePartition1D)
>     this.k = k
>     this.vocabSize = docs.take(1).head._2.size // Second use docs
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically 
> detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to