[jira] [Comment Edited] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2017-01-04 Thread Jeff Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799156#comment-15799156
 ] 

Jeff Stein edited comment on SPARK-17975 at 1/4/17 7:47 PM:


Attaching vertical bar delimited documents (one per line).

With my quick fix, I'm seeing a lot more persisted RDDs on the "Storage" tab. 
I'm either not cleaning something up or there's another issue related to that.


was (Author: jvstein):
Attaching vertical bar delimited documents (one per line).

> EMLDAOptimizer fails with ClassCastException on YARN
> 
>
> Key: SPARK-17975
> URL: https://issues.apache.org/jira/browse/SPARK-17975
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.1
> Environment: Centos 6, CDH 5.7, Java 1.7u80
>Reporter: Jeff Stein
> Attachments: docs.txt
>
>
> I'm able to reproduce the error consistently with a 2000 record text file 
> with each record having 1-5 terms and checkpointing enabled. It looks like 
> the problem was introduced with the resolution for SPARK-13355.
> The EdgeRDD class seems to be lying about it's type in a way that causes 
> RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an 
> RDD of Edge elements.
> {code}
> val spark = SparkSession.builder.appName("lda").getOrCreate()
> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
> val data: RDD[(Long, Vector)] = // snip
> data.setName("data").cache()
> val lda = new LDA
> val optimizer = new EMLDAOptimizer
> lda.setOptimizer(optimizer)
>   .setK(10)
>   .setMaxIterations(400)
>   .setAlpha(-1)
>   .setBeta(-1)
>   .setCheckpointInterval(7)
> val ldaModel = lda.run(data)
> {code}
> {noformat}
> 16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 
> 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be 
> cast to org.apache.spark.graphx.Edge
>   at 
> org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
>   at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
>   at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
>   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>   at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> 

[jira] [Updated] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2017-01-04 Thread Jeff Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Stein updated SPARK-17975:
---
Attachment: docs.txt

Attaching vertical bar delimited documents (one per line).

> EMLDAOptimizer fails with ClassCastException on YARN
> 
>
> Key: SPARK-17975
> URL: https://issues.apache.org/jira/browse/SPARK-17975
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.1
> Environment: Centos 6, CDH 5.7, Java 1.7u80
>Reporter: Jeff Stein
> Attachments: docs.txt
>
>
> I'm able to reproduce the error consistently with a 2000 record text file 
> with each record having 1-5 terms and checkpointing enabled. It looks like 
> the problem was introduced with the resolution for SPARK-13355.
> The EdgeRDD class seems to be lying about it's type in a way that causes 
> RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an 
> RDD of Edge elements.
> {code}
> val spark = SparkSession.builder.appName("lda").getOrCreate()
> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
> val data: RDD[(Long, Vector)] = // snip
> data.setName("data").cache()
> val lda = new LDA
> val optimizer = new EMLDAOptimizer
> lda.setOptimizer(optimizer)
>   .setK(10)
>   .setMaxIterations(400)
>   .setAlpha(-1)
>   .setBeta(-1)
>   .setCheckpointInterval(7)
> val ldaModel = lda.run(data)
> {code}
> {noformat}
> 16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 
> 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be 
> cast to org.apache.spark.graphx.Edge
>   at 
> org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
>   at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
>   at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
>   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>   at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:722)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Updated] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2016-10-17 Thread Jeff Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Stein updated SPARK-17975:
---
Description: 
I'm able to reproduce the error consistently with a 2000 record text file with 
each record having 1-5 terms and checkpointing enabled. It looks like the 
problem was introduced with the resolution for SPARK-13355.

The EdgeRDD class seems to be lying about it's type in a way that causes 
RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD 
of Edge elements.

{code}
val spark = SparkSession.builder.appName("lda").getOrCreate()
spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
val data: RDD[(Long, Vector)] = // snip
data.setName("data").cache()
val lda = new LDA
val optimizer = new EMLDAOptimizer
lda.setOptimizer(optimizer)
  .setK(10)
  .setMaxIterations(400)
  .setAlpha(-1)
  .setBeta(-1)
  .setCheckpointInterval(7)
val ldaModel = lda.run(data)
{code}

{noformat}
16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 1225, 
server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.graphx.Edge
at 
org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
{noformat}

  was:
I'm able to reproduce the error consistently with a 2000 record text file with 
each record having 1-5 terms and checkpointing enabled. It looks like the 
problem was introduced with the resolution for SPARK-13355.

The EdgeRDD class seems to be lying about it's type in a way that causes 
RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD 
of Edge elements.

{code}
val spark = SparkSession.builder.appName("lda").getOrCreate()
spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
val data: RDD[(Long, Vector)] = // snip
data.setName("data").cache()
val lda = new LDA
val optimizer = new EMLDAOptimizer
lda.setOptimizer(optimizer)
  .setK(10)
  .setMaxIterations(400)
  .setAlpha(-1)
  .setBeta(-1)
  .setCheckpointInterval(7)
val ldaModel = lda.run(data)
{code}


> EMLDAOptimizer 

[jira] [Commented] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2016-10-17 Thread Jeff Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583284#comment-15583284
 ] 

Jeff Stein commented on SPARK-17975:


Another issue that seems to be related to EdgeRDD partition problems.

> EMLDAOptimizer fails with ClassCastException on YARN
> 
>
> Key: SPARK-17975
> URL: https://issues.apache.org/jira/browse/SPARK-17975
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.1
> Environment: Centos 6, CDH 5.7, Java 1.7u80
>Reporter: Jeff Stein
>
> I'm able to reproduce the error consistently with a 2000 record text file 
> with each record having 1-5 terms and checkpointing enabled. It looks like 
> the problem was introduced with the resolution for SPARK-13355.
> The EdgeRDD class seems to be lying about it's type in a way that causes 
> RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an 
> RDD of Edge elements.
> {code}
> val spark = SparkSession.builder.appName("lda").getOrCreate()
> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
> val data: RDD[(Long, Vector)] = // snip
> data.setName("data").cache()
> val lda = new LDA
> val optimizer = new EMLDAOptimizer
> lda.setOptimizer(optimizer)
>   .setK(10)
>   .setMaxIterations(400)
>   .setAlpha(-1)
>   .setBeta(-1)
>   .setCheckpointInterval(7)
> val ldaModel = lda.run(data)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2016-10-17 Thread Jeff Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583284#comment-15583284
 ] 

Jeff Stein edited comment on SPARK-17975 at 10/17/16 8:04 PM:
--

Adding a link to another issue that seems to be related to EdgeRDD partition 
problems.


was (Author: jvstein):
Another issue that seems to be related to EdgeRDD partition problems.

> EMLDAOptimizer fails with ClassCastException on YARN
> 
>
> Key: SPARK-17975
> URL: https://issues.apache.org/jira/browse/SPARK-17975
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.1
> Environment: Centos 6, CDH 5.7, Java 1.7u80
>Reporter: Jeff Stein
>
> I'm able to reproduce the error consistently with a 2000 record text file 
> with each record having 1-5 terms and checkpointing enabled. It looks like 
> the problem was introduced with the resolution for SPARK-13355.
> The EdgeRDD class seems to be lying about it's type in a way that causes 
> RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an 
> RDD of Edge elements.
> {code}
> val spark = SparkSession.builder.appName("lda").getOrCreate()
> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
> val data: RDD[(Long, Vector)] = // snip
> data.setName("data").cache()
> val lda = new LDA
> val optimizer = new EMLDAOptimizer
> lda.setOptimizer(optimizer)
>   .setK(10)
>   .setMaxIterations(400)
>   .setAlpha(-1)
>   .setBeta(-1)
>   .setCheckpointInterval(7)
> val ldaModel = lda.run(data)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17975) EMLDAOptimizer fails with ClassCastException on YARN

2016-10-17 Thread Jeff Stein (JIRA)
Jeff Stein created SPARK-17975:
--

 Summary: EMLDAOptimizer fails with ClassCastException on YARN
 Key: SPARK-17975
 URL: https://issues.apache.org/jira/browse/SPARK-17975
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 2.0.1
 Environment: Centos 6, CDH 5.7, Java 1.7u80
Reporter: Jeff Stein


I'm able to reproduce the error consistently with a 2000 record text file with 
each record having 1-5 terms and checkpointing enabled. It looks like the 
problem was introduced with the resolution for SPARK-13355.

The EdgeRDD class seems to be lying about it's type in a way that causes 
RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD 
of Edge elements.

{code}
val spark = SparkSession.builder.appName("lda").getOrCreate()
spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
val data: RDD[(Long, Vector)] = // snip
data.setName("data").cache()
val lda = new LDA
val optimizer = new EMLDAOptimizer
lda.setOptimizer(optimizer)
  .setK(10)
  .setMaxIterations(400)
  .setAlpha(-1)
  .setBeta(-1)
  .setCheckpointInterval(7)
val ldaModel = lda.run(data)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

2016-04-05 Thread Jeff Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226687#comment-15226687
 ] 

Jeff Stein commented on SPARK-13048:


Sure, I'll take a look. It'll be a few days before I can get around to it.

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> --
>
> Key: SPARK-13048
> URL: https://issues.apache.org/jira/browse/SPARK-13048
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
> Environment: Standalone Spark cluster
>Reporter: Jeff Stein
>Assignee: Joseph K. Bradley
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the 
> DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the 
> DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data 
> set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved 
> environment. The model consistently failed in either the {{collect at 
> LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the 
> {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the 
> model). In both cases, a FileNotFoundException is thrown attempting to access 
> a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature 
> change. An alternative simple fix is to leave the last checkpoint around and 
> expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: 
> /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: 
> Array[Double]): LDAModel = {
> require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
> this.graphCheckpointer.deleteAllCheckpoints()
> // The constructor's default arguments assume gammaShape = 100 to ensure 
> equivalence in
> // LDAModel.toLocal conversion
> new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
> this.vocabSize,
>   Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
> this.topicConcentration,
>   iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>* Call this at the end to delete any remaining checkpoint files.
>*/
>   def deleteAllCheckpoints(): Unit = {
> while (checkpointQueue.nonEmpty) {
>   removeCheckpointFile()
> }
>   }
>   /**
>* Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>* This prints a warning but does not fail if the files cannot be removed.
>*/
>   private def removeCheckpointFile(): Unit = {
> val old = checkpointQueue.dequeue()
> // Since the old checkpoint is not deleted by Spark, we manually delete 
> it.
> val fs = FileSystem.get(sc.hadoopConfiguration)
> getCheckpointFiles(old).foreach { checkpointFile =>
>   try {
> fs.delete(new Path(checkpointFile), true)
>   } catch {
> case e: Exception =>
>   logWarning("PeriodicCheckpointer could not remove old checkpoint 
> file: " +
> checkpointFile)
>   }
> }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

2016-02-19 Thread Jeff Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155212#comment-15155212
 ] 

Jeff Stein commented on SPARK-13048:


As an aside, the code in the clustering namespace violates the [open/closed 
principle](https://en.wikipedia.org/wiki/Open/closed_principle).
  - LDAOptimizer is unnecessarily a sealed trait (I understand it's a developer 
api, but I'm a developer...)
  - EMLDAOptimizer is final
  - Lots of private[clustering]

All of this meant that writing a decent workaround for the bug took a lot more 
code than I would have hoped.

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> --
>
> Key: SPARK-13048
> URL: https://issues.apache.org/jira/browse/SPARK-13048
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
> Environment: Standalone Spark cluster
>Reporter: Jeff Stein
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the 
> DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the 
> DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data 
> set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved 
> environment. The model consistently failed in either the {{collect at 
> LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the 
> {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the 
> model). In both cases, a FileNotFoundException is thrown attempting to access 
> a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature 
> change. An alternative simple fix is to leave the last checkpoint around and 
> expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: 
> /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: 
> Array[Double]): LDAModel = {
> require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
> this.graphCheckpointer.deleteAllCheckpoints()
> // The constructor's default arguments assume gammaShape = 100 to ensure 
> equivalence in
> // LDAModel.toLocal conversion
> new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
> this.vocabSize,
>   Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
> this.topicConcentration,
>   iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>* Call this at the end to delete any remaining checkpoint files.
>*/
>   def deleteAllCheckpoints(): Unit = {
> while (checkpointQueue.nonEmpty) {
>   removeCheckpointFile()
> }
>   }
>   /**
>* Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>* This prints a warning but does not fail if the files cannot be removed.
>*/
>   private def removeCheckpointFile(): Unit = {
> val old = checkpointQueue.dequeue()
> // Since the old checkpoint is not deleted by Spark, we manually delete 
> it.
> val fs = FileSystem.get(sc.hadoopConfiguration)
> getCheckpointFiles(old).foreach { checkpointFile =>
>   try {
> fs.delete(new Path(checkpointFile), true)
>   } catch {
> case e: Exception =>
>   logWarning("PeriodicCheckpointer could not remove old checkpoint 
> file: " +
> checkpointFile)
>   }
> }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

2016-01-27 Thread Jeff Stein (JIRA)
Jeff Stein created SPARK-13048:
--

 Summary: EMLDAOptimizer deletes dependent checkpoint of 
DistributedLDAModel
 Key: SPARK-13048
 URL: https://issues.apache.org/jira/browse/SPARK-13048
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 1.5.2
 Environment: Standalone Spark cluster
Reporter: Jeff Stein


In EMLDAOptimizer, all checkpoints are deleted before returning the 
DistributedLDAModel.

The most recent checkpoint is still necessary for operations on the 
DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data set)
- Late worker failures that require reading the now-dependent checkpoint.

I ran into this problem running a 10M record LDA model in a memory starved 
environment. The model persistently failed in either the {{collect at 
LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the 
{{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both 
cases, a FileNotFoundException is thrown attempting to access a checkpoint file.

I'm not sure what the correct fix is here; it might involve a class signature 
change. An alternative simple fix is to leave the last checkpoint around and 
expect the user to clean the checkpoint directory themselves.

{noformat}
java.io.FileNotFoundException: File does not exist: 
/hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
{noformat}

Relevant code is included below.

LDAOptimizer.scala:
{noformat}
  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): 
LDAModel = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
this.graphCheckpointer.deleteAllCheckpoints()
// The constructor's default arguments assume gammaShape = 100 to ensure 
equivalence in
// LDAModel.toLocal conversion
new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
this.vocabSize,
  Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
this.topicConcentration,
  iterationTimes)
  }
{noformat}

PeriodicCheckpointer.scala

{noformat}
  /**
   * Call this at the end to delete any remaining checkpoint files.
   */
  def deleteAllCheckpoints(): Unit = {
while (checkpointQueue.nonEmpty) {
  removeCheckpointFile()
}
  }

  /**
   * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
   * This prints a warning but does not fail if the files cannot be removed.
   */
  private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
val fs = FileSystem.get(sc.hadoopConfiguration)
getCheckpointFiles(old).foreach { checkpointFile =>
  try {
fs.delete(new Path(checkpointFile), true)
  } catch {
case e: Exception =>
  logWarning("PeriodicCheckpointer could not remove old checkpoint 
file: " +
checkpointFile)
  }
}
  }
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

2016-01-27 Thread Jeff Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Stein updated SPARK-13048:
---
Description: 
In EMLDAOptimizer, all checkpoints are deleted before returning the 
DistributedLDAModel.

The most recent checkpoint is still necessary for operations on the 
DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data 
set).
- Late worker failures that require reading the now-dependent checkpoint.

I ran into this problem running a 10M record LDA model in a memory starved 
environment. The model consistently failed in either the {{collect at 
LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the 
{{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the 
model). In both cases, a FileNotFoundException is thrown attempting to access a 
checkpoint file.

I'm not sure what the correct fix is here; it might involve a class signature 
change. An alternative simple fix is to leave the last checkpoint around and 
expect the user to clean the checkpoint directory themselves.

{noformat}
java.io.FileNotFoundException: File does not exist: 
/hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
{noformat}

Relevant code is included below.

LDAOptimizer.scala:
{noformat}
  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): 
LDAModel = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
this.graphCheckpointer.deleteAllCheckpoints()
// The constructor's default arguments assume gammaShape = 100 to ensure 
equivalence in
// LDAModel.toLocal conversion
new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
this.vocabSize,
  Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
this.topicConcentration,
  iterationTimes)
  }
{noformat}

PeriodicCheckpointer.scala

{noformat}
  /**
   * Call this at the end to delete any remaining checkpoint files.
   */
  def deleteAllCheckpoints(): Unit = {
while (checkpointQueue.nonEmpty) {
  removeCheckpointFile()
}
  }

  /**
   * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
   * This prints a warning but does not fail if the files cannot be removed.
   */
  private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
val fs = FileSystem.get(sc.hadoopConfiguration)
getCheckpointFiles(old).foreach { checkpointFile =>
  try {
fs.delete(new Path(checkpointFile), true)
  } catch {
case e: Exception =>
  logWarning("PeriodicCheckpointer could not remove old checkpoint 
file: " +
checkpointFile)
  }
}
  }
{noformat}

  was:
In EMLDAOptimizer, all checkpoints are deleted before returning the 
DistributedLDAModel.

The most recent checkpoint is still necessary for operations on the 
DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data set)
- Late worker failures that require reading the now-dependent checkpoint.

I ran into this problem running a 10M record LDA model in a memory starved 
environment. The model persistently failed in either the {{collect at 
LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the 
{{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both 
cases, a FileNotFoundException is thrown attempting to access a checkpoint file.

I'm not sure what the correct fix is here; it might involve a class signature 
change. An alternative simple fix is to leave the last checkpoint around and 
expect the user to clean the checkpoint directory themselves.

{noformat}
java.io.FileNotFoundException: File does not exist: 
/hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
{noformat}

Relevant code is included below.

LDAOptimizer.scala:
{noformat}
  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): 
LDAModel = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
this.graphCheckpointer.deleteAllCheckpoints()
// The constructor's default arguments assume gammaShape = 100 to ensure 
equivalence in
// LDAModel.toLocal conversion
new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
this.vocabSize,
  Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
this.topicConcentration,
  iterationTimes)
  }
{noformat}

PeriodicCheckpointer.scala

{noformat}
  /**
   * Call this at the end to delete any remaining checkpoint files.
   */
  def deleteAllCheckpoints(): Unit = {
while (checkpointQueue.nonEmpty) {
  removeCheckpointFile()
}
  }

  /**
   * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
   *