thanks for the reply. Asher, have you experienced problem when checkpoints are not enabled as well? If we have large number of iterations (over 150) and checkpoints are not enabled, the process just hangs (without no error) at around iteration 120-140 (on spark 2.0.0). I could not reproduce this outside of our data, unfortunately.
On Fri, Nov 4, 2016 at 2:53 AM, Asher Krim <ak...@hubspot.com> wrote: > There is an open Jira for this issue (https://issues.apache.org/ > jira/browse/SPARK-14804). There have been a few proposed fixes so far. > > On Thu, Nov 3, 2016 at 2:20 PM, jamborta <jambo...@gmail.com> wrote: > >> Hi there, >> >> I am trying to run the example LDA code >> (http://spark.apache.org/docs/latest/mllib-clustering.html#l >> atent-dirichlet-allocation-lda) >> on Spark 2.0.0/EMR 5.0.0 >> >> If run it with checkpoints enabled (sc.setCheckpointDir("s3n://s3-path/") >> >> ldaModel = LDA.train(corpus, k=3, maxIterations=200, >> checkpointInterval=10) >> >> I get the following error (sorry, quite long): >> >> Py4JJavaErrorTraceback (most recent call last) >> <ipython-input-10-64711b08964e> in <module>() >> ----> 1 ldaModel = LDA.train(corpus, k=3, maxIterations=200, >> checkpointInterval=10) >> >> /usr/lib/spark/python/pyspark/mllib/clustering.py in train(cls, rdd, k, >> maxIterations, docConcentration, topicConcentration, seed, >> checkpointInterval, optimizer) >> 1037 model = callMLlibFunc("trainLDAModel", rdd, k, >> maxIterations, >> 1038 docConcentration, >> topicConcentration, >> seed, >> -> 1039 checkpointInterval, optimizer) >> 1040 return LDAModel(model) >> 1041 >> >> /usr/lib/spark/python/pyspark/mllib/common.py in callMLlibFunc(name, >> *args) >> 128 sc = SparkContext.getOrCreate() >> 129 api = getattr(sc._jvm.PythonMLLibAPI(), name) >> --> 130 return callJavaFunc(sc, api, *args) >> 131 >> 132 >> >> /usr/lib/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, >> *args) >> 121 """ Call Java Function """ >> 122 args = [_py2java(sc, a) for a in args] >> --> 123 return _java2py(sc, func(*args)) >> 124 >> 125 >> >> /usr/lib/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in >> __call__(self, *args) >> 931 answer = self.gateway_client.send_command(command) >> 932 return_value = get_return_value( >> --> 933 answer, self.gateway_client, self.target_id, >> self.name) >> 934 >> 935 for temp_arg in temp_args: >> >> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) >> 61 def deco(*a, **kw): >> 62 try: >> ---> 63 return f(*a, **kw) >> 64 except py4j.protocol.Py4JJavaError as e: >> 65 s = e.java_exception.toString() >> >> /usr/lib/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in >> get_return_value(answer, gateway_client, target_id, name) >> 310 raise Py4JJavaError( >> 311 "An error occurred while calling >> {0}{1}{2}.\n". >> --> 312 format(target_id, ".", name), value) >> 313 else: >> 314 raise Py4JError( >> >> Py4JJavaError: An error occurred while calling o115.trainLDAModel. >> : org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 1 >> in stage 458.0 failed 4 times, most recent failure: Lost task 1.3 in stage >> 458.0 (TID 14827, ip-10-197-192-2.eu-west-1.compute.internal): >> java.lang.ClassCastException: scala.Tuple2 cannot be cast to >> org.apache.spark.graphx.Edge >> at >> org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1. >> apply(EdgeRDD.scala:107) >> at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> at >> org.apache.spark.InterruptibleIterator.foreach(Interruptible >> Iterator.scala:28) >> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD. >> scala:107) >> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD. >> scala:105) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$ >> anonfun$apply$25.apply(RDD.scala:801) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$ >> anonfun$apply$25.apply(RDD.scala:801) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) >> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator >> $1.apply(BlockManager.scala:919) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator >> $1.apply(BlockManager.scala:910) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager. >> scala:866) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockMan >> ager.scala:910) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM >> anager.scala:668) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) >> at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:79) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:47) >> at org.apache.spark.scheduler.Task.run(Task.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor. >> scala:274) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch >> eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ >> 1.apply(DAGScheduler.scala:1438) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ >> 1.apply(DAGScheduler.scala:1437) >> at >> scala.collection.mutable.ResizableArray$class.foreach(Resiza >> bleArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer. >> scala:48) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu >> ler.scala:1437) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS >> etFailed$1.apply(DAGScheduler.scala:811) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS >> etFailed$1.apply(DAGScheduler.scala:811) >> at scala.Option.foreach(Option.scala:257) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( >> DAGScheduler.scala:811) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOn >> Receive(DAGScheduler.scala:1659) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe >> ceive(DAGScheduler.scala:1618) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe >> ceive(DAGScheduler.scala:1607) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala: >> 48) >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler. >> scala:632) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) >> at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >> onScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >> onScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) >> at org.apache.spark.rdd.RDD.fold(RDD.scala:1040) >> at >> org.apache.spark.mllib.clustering.EMLDAOptimizer.computeGlob >> alTopicTotals(LDAOptimizer.scala:226) >> at >> org.apache.spark.mllib.clustering.EMLDAOptimizer.next( >> LDAOptimizer.scala:213) >> at >> org.apache.spark.mllib.clustering.EMLDAOptimizer.next( >> LDAOptimizer.scala:79) >> at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:299) >> at >> org.apache.spark.mllib.api.python.PythonMLLibAPI.trainLDAMod >> el(PythonMLLibAPI.scala:552) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine. >> java:357) >> at py4j.Gateway.invoke(Gateway.java:280) >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j >> ava:128) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:211) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.ClassCastException: scala.Tuple2 cannot be cast to >> org.apache.spark.graphx.Edge >> at >> org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1. >> apply(EdgeRDD.scala:107) >> at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> at >> org.apache.spark.InterruptibleIterator.foreach(Interruptible >> Iterator.scala:28) >> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD. >> scala:107) >> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD. >> scala:105) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$ >> anonfun$apply$25.apply(RDD.scala:801) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$ >> anonfun$apply$25.apply(RDD.scala:801) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) >> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator >> $1.apply(BlockManager.scala:919) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator >> $1.apply(BlockManager.scala:910) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager. >> scala:866) >> at >> org.apache.spark.storage.BlockManager.doPutIterator(BlockMan >> ager.scala:910) >> at >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM >> anager.scala:668) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) >> at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: >> 319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:79) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:47) >> at org.apache.spark.scheduler.Task.run(Task.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor. >> scala:274) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> ... 1 more >> >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/example-LDA-code-ClassCastException-tp28009.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Asher Krim > Senior Software Engineer >