[ 
https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180241#comment-15180241
 ] 

Joseph K. Bradley commented on SPARK-13048:
-------------------------------------------

I'd say the best fix would be to add an option to LDA to not delete the last 
checkpoint.  I'd prefer to expose this as a Param in the spark.ml API, but it 
could be added to the spark.mllib API as well if necessary.

[~holdenk]  I agree we need to figure out how to handle/control caching and 
checkpointing within Pipelines, but that will have to wait for after 2.0.

[~jvstein]  We try to minimize the public API.  Although I agree with you about 
opening up APIs in principal, it have proven dangerous in practice.  Even when 
we mark things DeveloperApi, many users still use those APIs, making it 
difficult to change them in the future.

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
>                 Key: SPARK-13048
>                 URL: https://issues.apache.org/jira/browse/SPARK-13048
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.5.2
>         Environment: Standalone Spark cluster
>            Reporter: Jeff Stein
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the 
> DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the 
> DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data 
> set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved 
> environment. The model consistently failed in either the {{collect at 
> LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the 
> {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the 
> model). In both cases, a FileNotFoundException is thrown attempting to access 
> a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature 
> change. An alternative simple fix is to leave the last checkpoint around and 
> expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: 
> /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: 
> Array[Double]): LDAModel = {
>     require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
>     this.graphCheckpointer.deleteAllCheckpoints()
>     // The constructor's default arguments assume gammaShape = 100 to ensure 
> equivalence in
>     // LDAModel.toLocal conversion
>     new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
> this.vocabSize,
>       Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
> this.topicConcentration,
>       iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>    * Call this at the end to delete any remaining checkpoint files.
>    */
>   def deleteAllCheckpoints(): Unit = {
>     while (checkpointQueue.nonEmpty) {
>       removeCheckpointFile()
>     }
>   }
>   /**
>    * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>    * This prints a warning but does not fail if the files cannot be removed.
>    */
>   private def removeCheckpointFile(): Unit = {
>     val old = checkpointQueue.dequeue()
>     // Since the old checkpoint is not deleted by Spark, we manually delete 
> it.
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     getCheckpointFiles(old).foreach { checkpointFile =>
>       try {
>         fs.delete(new Path(checkpointFile), true)
>       } catch {
>         case e: Exception =>
>           logWarning("PeriodicCheckpointer could not remove old checkpoint 
> file: " +
>             checkpointFile)
>       }
>     }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to