[jira] [Comment Edited] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
[ https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799156#comment-15799156 ] Jeff Stein edited comment on SPARK-17975 at 1/4/17 7:47 PM: Attaching vertical bar delimited documents (one per line). With my quick fix, I'm seeing a lot more persisted RDDs on the "Storage" tab. I'm either not cleaning something up or there's another issue related to that. was (Author: jvstein): Attaching vertical bar delimited documents (one per line). > EMLDAOptimizer fails with ClassCastException on YARN > > > Key: SPARK-17975 > URL: https://issues.apache.org/jira/browse/SPARK-17975 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.1 > Environment: Centos 6, CDH 5.7, Java 1.7u80 >Reporter: Jeff Stein > Attachments: docs.txt > > > I'm able to reproduce the error consistently with a 2000 record text file > with each record having 1-5 terms and checkpointing enabled. It looks like > the problem was introduced with the resolution for SPARK-13355. > The EdgeRDD class seems to be lying about it's type in a way that causes > RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an > RDD of Edge elements. > {code} > val spark = SparkSession.builder.appName("lda").getOrCreate() > spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") > val data: RDD[(Long, Vector)] = // snip > data.setName("data").cache() > val lda = new LDA > val optimizer = new EMLDAOptimizer > lda.setOptimizer(optimizer) > .setK(10) > .setMaxIterations(400) > .setAlpha(-1) > .setBeta(-1) > .setCheckpointInterval(7) > val ldaModel = lda.run(data) > {code} > {noformat} > 16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID > 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be > cast to org.apache.spark.graphx.Edge > at > org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107) > at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at >
[jira] [Updated] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
[ https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Stein updated SPARK-17975: --- Attachment: docs.txt Attaching vertical bar delimited documents (one per line). > EMLDAOptimizer fails with ClassCastException on YARN > > > Key: SPARK-17975 > URL: https://issues.apache.org/jira/browse/SPARK-17975 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.1 > Environment: Centos 6, CDH 5.7, Java 1.7u80 >Reporter: Jeff Stein > Attachments: docs.txt > > > I'm able to reproduce the error consistently with a 2000 record text file > with each record having 1-5 terms and checkpointing enabled. It looks like > the problem was introduced with the resolution for SPARK-13355. > The EdgeRDD class seems to be lying about it's type in a way that causes > RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an > RDD of Edge elements. > {code} > val spark = SparkSession.builder.appName("lda").getOrCreate() > spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") > val data: RDD[(Long, Vector)] = // snip > data.setName("data").cache() > val lda = new LDA > val optimizer = new EMLDAOptimizer > lda.setOptimizer(optimizer) > .setK(10) > .setMaxIterations(400) > .setAlpha(-1) > .setBeta(-1) > .setCheckpointInterval(7) > val ldaModel = lda.run(data) > {code} > {noformat} > 16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID > 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be > cast to org.apache.spark.graphx.Edge > at > org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107) > at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Updated] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
[ https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Stein updated SPARK-17975: --- Description: I'm able to reproduce the error consistently with a 2000 record text file with each record having 1-5 terms and checkpointing enabled. It looks like the problem was introduced with the resolution for SPARK-13355. The EdgeRDD class seems to be lying about it's type in a way that causes RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD of Edge elements. {code} val spark = SparkSession.builder.appName("lda").getOrCreate() spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") val data: RDD[(Long, Vector)] = // snip data.setName("data").cache() val lda = new LDA val optimizer = new EMLDAOptimizer lda.setOptimizer(optimizer) .setK(10) .setMaxIterations(400) .setAlpha(-1) .setBeta(-1) .setCheckpointInterval(7) val ldaModel = lda.run(data) {code} {noformat} 16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.graphx.Edge at org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) {noformat} was: I'm able to reproduce the error consistently with a 2000 record text file with each record having 1-5 terms and checkpointing enabled. It looks like the problem was introduced with the resolution for SPARK-13355. The EdgeRDD class seems to be lying about it's type in a way that causes RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD of Edge elements. {code} val spark = SparkSession.builder.appName("lda").getOrCreate() spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") val data: RDD[(Long, Vector)] = // snip data.setName("data").cache() val lda = new LDA val optimizer = new EMLDAOptimizer lda.setOptimizer(optimizer) .setK(10) .setMaxIterations(400) .setAlpha(-1) .setBeta(-1) .setCheckpointInterval(7) val ldaModel = lda.run(data) {code} > EMLDAOptimizer
[jira] [Commented] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
[ https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583284#comment-15583284 ] Jeff Stein commented on SPARK-17975: Another issue that seems to be related to EdgeRDD partition problems. > EMLDAOptimizer fails with ClassCastException on YARN > > > Key: SPARK-17975 > URL: https://issues.apache.org/jira/browse/SPARK-17975 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.1 > Environment: Centos 6, CDH 5.7, Java 1.7u80 >Reporter: Jeff Stein > > I'm able to reproduce the error consistently with a 2000 record text file > with each record having 1-5 terms and checkpointing enabled. It looks like > the problem was introduced with the resolution for SPARK-13355. > The EdgeRDD class seems to be lying about it's type in a way that causes > RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an > RDD of Edge elements. > {code} > val spark = SparkSession.builder.appName("lda").getOrCreate() > spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") > val data: RDD[(Long, Vector)] = // snip > data.setName("data").cache() > val lda = new LDA > val optimizer = new EMLDAOptimizer > lda.setOptimizer(optimizer) > .setK(10) > .setMaxIterations(400) > .setAlpha(-1) > .setBeta(-1) > .setCheckpointInterval(7) > val ldaModel = lda.run(data) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
[ https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583284#comment-15583284 ] Jeff Stein edited comment on SPARK-17975 at 10/17/16 8:04 PM: -- Adding a link to another issue that seems to be related to EdgeRDD partition problems. was (Author: jvstein): Another issue that seems to be related to EdgeRDD partition problems. > EMLDAOptimizer fails with ClassCastException on YARN > > > Key: SPARK-17975 > URL: https://issues.apache.org/jira/browse/SPARK-17975 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.1 > Environment: Centos 6, CDH 5.7, Java 1.7u80 >Reporter: Jeff Stein > > I'm able to reproduce the error consistently with a 2000 record text file > with each record having 1-5 terms and checkpointing enabled. It looks like > the problem was introduced with the resolution for SPARK-13355. > The EdgeRDD class seems to be lying about it's type in a way that causes > RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an > RDD of Edge elements. > {code} > val spark = SparkSession.builder.appName("lda").getOrCreate() > spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") > val data: RDD[(Long, Vector)] = // snip > data.setName("data").cache() > val lda = new LDA > val optimizer = new EMLDAOptimizer > lda.setOptimizer(optimizer) > .setK(10) > .setMaxIterations(400) > .setAlpha(-1) > .setBeta(-1) > .setCheckpointInterval(7) > val ldaModel = lda.run(data) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN
Jeff Stein created SPARK-17975: -- Summary: EMLDAOptimizer fails with ClassCastException on YARN Key: SPARK-17975 URL: https://issues.apache.org/jira/browse/SPARK-17975 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 2.0.1 Environment: Centos 6, CDH 5.7, Java 1.7u80 Reporter: Jeff Stein I'm able to reproduce the error consistently with a 2000 record text file with each record having 1-5 terms and checkpointing enabled. It looks like the problem was introduced with the resolution for SPARK-13355. The EdgeRDD class seems to be lying about it's type in a way that causes RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD of Edge elements. {code} val spark = SparkSession.builder.appName("lda").getOrCreate() spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") val data: RDD[(Long, Vector)] = // snip data.setName("data").cache() val lda = new LDA val optimizer = new EMLDAOptimizer lda.setOptimizer(optimizer) .setK(10) .setMaxIterations(400) .setAlpha(-1) .setBeta(-1) .setCheckpointInterval(7) val ldaModel = lda.run(data) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
[ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226687#comment-15226687 ] Jeff Stein commented on SPARK-13048: Sure, I'll take a look. It'll be a few days before I can get around to it. > EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel > -- > > Key: SPARK-13048 > URL: https://issues.apache.org/jira/browse/SPARK-13048 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 > Environment: Standalone Spark cluster >Reporter: Jeff Stein >Assignee: Joseph K. Bradley > > In EMLDAOptimizer, all checkpoints are deleted before returning the > DistributedLDAModel. > The most recent checkpoint is still necessary for operations on the > DistributedLDAModel under a couple scenarios: > - The graph doesn't fit in memory on the worker nodes (e.g. very large data > set). > - Late worker failures that require reading the now-dependent checkpoint. > I ran into this problem running a 10M record LDA model in a memory starved > environment. The model consistently failed in either the {{collect at > LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the > {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the > model). In both cases, a FileNotFoundException is thrown attempting to access > a checkpoint file. > I'm not sure what the correct fix is here; it might involve a class signature > change. An alternative simple fix is to leave the last checkpoint around and > expect the user to clean the checkpoint directory themselves. > {noformat} > java.io.FileNotFoundException: File does not exist: > /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 > {noformat} > Relevant code is included below. > LDAOptimizer.scala: > {noformat} > override private[clustering] def getLDAModel(iterationTimes: > Array[Double]): LDAModel = { > require(graph != null, "graph is null, EMLDAOptimizer not initialized.") > this.graphCheckpointer.deleteAllCheckpoints() > // The constructor's default arguments assume gammaShape = 100 to ensure > equivalence in > // LDAModel.toLocal conversion > new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, > this.vocabSize, > Vectors.dense(Array.fill(this.k)(this.docConcentration)), > this.topicConcentration, > iterationTimes) > } > {noformat} > PeriodicCheckpointer.scala > {noformat} > /** >* Call this at the end to delete any remaining checkpoint files. >*/ > def deleteAllCheckpoints(): Unit = { > while (checkpointQueue.nonEmpty) { > removeCheckpointFile() > } > } > /** >* Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. >* This prints a warning but does not fail if the files cannot be removed. >*/ > private def removeCheckpointFile(): Unit = { > val old = checkpointQueue.dequeue() > // Since the old checkpoint is not deleted by Spark, we manually delete > it. > val fs = FileSystem.get(sc.hadoopConfiguration) > getCheckpointFiles(old).foreach { checkpointFile => > try { > fs.delete(new Path(checkpointFile), true) > } catch { > case e: Exception => > logWarning("PeriodicCheckpointer could not remove old checkpoint > file: " + > checkpointFile) > } > } > } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
[ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155212#comment-15155212 ] Jeff Stein commented on SPARK-13048: As an aside, the code in the clustering namespace violates the [open/closed principle](https://en.wikipedia.org/wiki/Open/closed_principle). - LDAOptimizer is unnecessarily a sealed trait (I understand it's a developer api, but I'm a developer...) - EMLDAOptimizer is final - Lots of private[clustering] All of this meant that writing a decent workaround for the bug took a lot more code than I would have hoped. > EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel > -- > > Key: SPARK-13048 > URL: https://issues.apache.org/jira/browse/SPARK-13048 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 > Environment: Standalone Spark cluster >Reporter: Jeff Stein > > In EMLDAOptimizer, all checkpoints are deleted before returning the > DistributedLDAModel. > The most recent checkpoint is still necessary for operations on the > DistributedLDAModel under a couple scenarios: > - The graph doesn't fit in memory on the worker nodes (e.g. very large data > set). > - Late worker failures that require reading the now-dependent checkpoint. > I ran into this problem running a 10M record LDA model in a memory starved > environment. The model consistently failed in either the {{collect at > LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the > {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the > model). In both cases, a FileNotFoundException is thrown attempting to access > a checkpoint file. > I'm not sure what the correct fix is here; it might involve a class signature > change. An alternative simple fix is to leave the last checkpoint around and > expect the user to clean the checkpoint directory themselves. > {noformat} > java.io.FileNotFoundException: File does not exist: > /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 > {noformat} > Relevant code is included below. > LDAOptimizer.scala: > {noformat} > override private[clustering] def getLDAModel(iterationTimes: > Array[Double]): LDAModel = { > require(graph != null, "graph is null, EMLDAOptimizer not initialized.") > this.graphCheckpointer.deleteAllCheckpoints() > // The constructor's default arguments assume gammaShape = 100 to ensure > equivalence in > // LDAModel.toLocal conversion > new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, > this.vocabSize, > Vectors.dense(Array.fill(this.k)(this.docConcentration)), > this.topicConcentration, > iterationTimes) > } > {noformat} > PeriodicCheckpointer.scala > {noformat} > /** >* Call this at the end to delete any remaining checkpoint files. >*/ > def deleteAllCheckpoints(): Unit = { > while (checkpointQueue.nonEmpty) { > removeCheckpointFile() > } > } > /** >* Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. >* This prints a warning but does not fail if the files cannot be removed. >*/ > private def removeCheckpointFile(): Unit = { > val old = checkpointQueue.dequeue() > // Since the old checkpoint is not deleted by Spark, we manually delete > it. > val fs = FileSystem.get(sc.hadoopConfiguration) > getCheckpointFiles(old).foreach { checkpointFile => > try { > fs.delete(new Path(checkpointFile), true) > } catch { > case e: Exception => > logWarning("PeriodicCheckpointer could not remove old checkpoint > file: " + > checkpointFile) > } > } > } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
Jeff Stein created SPARK-13048: -- Summary: EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel Key: SPARK-13048 URL: https://issues.apache.org/jira/browse/SPARK-13048 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.5.2 Environment: Standalone Spark cluster Reporter: Jeff Stein In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel. The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios: - The graph doesn't fit in memory on the worker nodes (e.g. very large data set) - Late worker failures that require reading the now-dependent checkpoint. I ran into this problem running a 10M record LDA model in a memory starved environment. The model persistently failed in either the {{collect at LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the {{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file. I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves. {noformat} java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 {noformat} Relevant code is included below. LDAOptimizer.scala: {noformat} override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in // LDAModel.toLocal conversion new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize, Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration, iterationTimes) } {noformat} PeriodicCheckpointer.scala {noformat} /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { while (checkpointQueue.nonEmpty) { removeCheckpointFile() } } /** * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. * This prints a warning but does not fail if the files cannot be removed. */ private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. val fs = FileSystem.get(sc.hadoopConfiguration) getCheckpointFiles(old).foreach { checkpointFile => try { fs.delete(new Path(checkpointFile), true) } catch { case e: Exception => logWarning("PeriodicCheckpointer could not remove old checkpoint file: " + checkpointFile) } } } {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
[ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Stein updated SPARK-13048: --- Description: In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel. The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios: - The graph doesn't fit in memory on the worker nodes (e.g. very large data set). - Late worker failures that require reading the now-dependent checkpoint. I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the {{collect at LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file. I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves. {noformat} java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 {noformat} Relevant code is included below. LDAOptimizer.scala: {noformat} override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in // LDAModel.toLocal conversion new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize, Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration, iterationTimes) } {noformat} PeriodicCheckpointer.scala {noformat} /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { while (checkpointQueue.nonEmpty) { removeCheckpointFile() } } /** * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. * This prints a warning but does not fail if the files cannot be removed. */ private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. val fs = FileSystem.get(sc.hadoopConfiguration) getCheckpointFiles(old).foreach { checkpointFile => try { fs.delete(new Path(checkpointFile), true) } catch { case e: Exception => logWarning("PeriodicCheckpointer could not remove old checkpoint file: " + checkpointFile) } } } {noformat} was: In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel. The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios: - The graph doesn't fit in memory on the worker nodes (e.g. very large data set) - Late worker failures that require reading the now-dependent checkpoint. I ran into this problem running a 10M record LDA model in a memory starved environment. The model persistently failed in either the {{collect at LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the {{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file. I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves. {noformat} java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 {noformat} Relevant code is included below. LDAOptimizer.scala: {noformat} override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in // LDAModel.toLocal conversion new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize, Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration, iterationTimes) } {noformat} PeriodicCheckpointer.scala {noformat} /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { while (checkpointQueue.nonEmpty) { removeCheckpointFile() } } /** * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. *