[Forking this thread.] According to the Spark Programming Guide <http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence>, persisting RDDs with MEMORY_ONLY should not choke if the RDD cannot be held entirely in memory:
If the RDD does not fit in memory, some partitions will not be cached and > will be recomputed on the fly each time they're needed. This is the default > level. What I’m seeing per the discussion below is that when I try to cache more data than the cluster can hold in memory, I get: 14/08/01 15:41:23 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Trying MEMORY_AND_DISK yields the same error. So what's the deal? I'm running 1.0.1 on EC2. Nick On Thu, Jul 31, 2014 at 5:17 PM, Nicholas Chammas < nicholas.cham...@gmail.com> wrote: Davies, > > That was it. Removing the call to cache() let the job run successfully, > but this challenges my understanding of how Spark handles caching data. > > I thought it was safe to cache data sets larger than the cluster could > hold in memory. What Spark would do is cache as much as it could and leave > the rest for access from disk. > > Is that not correct? > > Nick > > On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu <dav...@databricks.com> wrote: > > Maybe because you try to cache all the data in memory, but heap of JVM >> is not big enough. >> >> If remove the .cache(), is there still this problem? >> >> On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas >> <nicholas.cham...@gmail.com> wrote: >> > Hmm, looking at this stack trace a bit more carefully, it looks like the >> > code in the Hadoop API for reading data from the source choked. Is that >> > correct? >> > >> > Perhaps, there is a missing newline (or two. or more) that make 1 line >> of >> > data too much to read in at once? I'm just guessing here. Gonna try to >> track >> > this down real quick. >> > >> > Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in >> 1.0.2-rc1 >> > or anything like that. >> > >> > Nick >> > >> > >> > On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas >> > <nicholas.cham...@gmail.com> wrote: >> >> >> >> So if I try this again but in the Scala shell (as opposed to the Python >> >> one), this is what I get: >> >> >> >> scala> val a = sc.textFile("s3n://some-path/*.json", >> >> minPartitions=sc.defaultParallelism * 3).cache() >> >> a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at >> >> <console>:12 >> >> >> >> scala> a.map(_.length).max >> >> 14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available >> >> 14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22) >> >> 14/07/31 20:10:41 WARN TaskSetManager: Loss was due to >> >> java.lang.OutOfMemoryError >> >> java.lang.OutOfMemoryError: GC overhead limit exceeded >> >> at java.util.Arrays.copyOfRange(Arrays.java:2694) >> >> at java.lang.String.<init>(String.java:203) >> >> at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) >> >> at java.nio.CharBuffer.toString(CharBuffer.java:1201) >> >> at org.apache.hadoop.io.Text.decode(Text.java:350) >> >> at org.apache.hadoop.io.Text.decode(Text.java:327) >> >> at org.apache.hadoop.io.Text.toString(Text.java:254) >> >> at >> >> >> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) >> >> at >> >> >> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) >> >> at scala.collection.Iterator$anon$11.next(Iterator.scala:328) >> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> >> >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> >> at >> >> >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> >> at >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:745) >> >> 14/07/31 20:10:42 ERROR TaskSchedulerImpl: Lost executor 19 on >> >> ip-10-13-142-142.ec2.internal: OutOfMemoryError >> >> >> >> So I guess I need to fiddle with some memory configs? I’m surprised >> that >> >> just checking input line length could trigger this. >> >> >> >> Nick >> >> >> >> >> >> >> >> On Wed, Jul 30, 2014 at 8:58 PM, Davies Liu <dav...@databricks.com> >> wrote: >> >>> >> >>> The exception in Python means that the worker try to read command from >> >>> JVM, but it reach >> >>> the end of socket (socket had been closed). So it's possible that >> >>> there another exception >> >>> happened in JVM. >> >>> >> >>> Could you change the log level of log4j, then check is there any >> >>> problem inside JVM? >> >>> >> >>> Davies >> >>> >> >>> On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas >> >>> <nicholas.cham...@gmail.com> wrote: >> >>> > Any clues? This looks like a bug, but I can't report it without more >> >>> > precise >> >>> > information. >> >>> > >> >>> > >> >>> > On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas >> >>> > <nicholas.cham...@gmail.com> >> >>> > wrote: >> >>> >> >> >>> >> I’m in the PySpark shell and I’m trying to do this: >> >>> >> >> >>> >> a = >> >>> >> >> >>> >> >> sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json', >> >>> >> minPartitions=sc.defaultParallelism * 3).cache() >> >>> >> a.map(lambda x: len(x)).max() >> >>> >> >> >>> >> My job dies with the following: >> >>> >> >> >>> >> 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to >> >>> >> org.apache.spark.api.python.PythonException >> >>> >> org.apache.spark.api.python.PythonException: Traceback (most recent >> >>> >> call >> >>> >> last): >> >>> >> File "/root/spark/python/pyspark/worker.py", line 73, in main >> >>> >> command = pickleSer._read_with_length(infile) >> >>> >> File "/root/spark/python/pyspark/serializers.py", line 142, in >> >>> >> _read_with_length >> >>> >> length = read_int(stream) >> >>> >> File "/root/spark/python/pyspark/serializers.py", line 337, in >> >>> >> read_int >> >>> >> raise EOFError >> >>> >> EOFError >> >>> >> >> >>> >> at >> >>> >> >> >>> >> >> org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:115) >> >>> >> at >> >>> >> >> >>> >> >> org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:145) >> >> >>> >> at >> >>> >> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) >> >>> >> at >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >>> >> at >> >>> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >> >>> >> at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >>> >> at >> >>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >> >>> >> at >> >>> >> >> >>> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >>> >> at >> >>> >> >> >>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >>> >> at java.lang.Thread.run(Thread.java:745) >> >>> >> 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on >> >>> >> ip-10-190-171-217.ec2.internal: remote Akka client disassociated >> >>> >> >> >>> >> How do I debug this? I’m using 1.0.2-rc1 deployed to EC2. >> >>> >> >> >>> >> Nick >> >>> >> >> >>> >> >> >>> >> ________________________________ >> >>> >> View this message in context: How do you debug a PythonException? >> >>> >> Sent from the Apache Spark User List mailing list archive at >> >>> >> Nabble.com. >> >>> > >> >>> > >> >> >> >> >> > >> > >