I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We’ll try to look into these, seems like a serious error.
Matei On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> wrote: > Thanks, Matei. I am running "Spark 1.0.0-SNAPSHOT built for Hadoop > 1.0.4" from GitHub on 2014-03-18. > > I tried batchSizes of 512, 10, and 1 and each got me further but none > have succeeded. > > I can get this to work -- with manual interventions -- if I omit > `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1. 5 > of the 175 executors hung, and I had to kill the python process to get > things going again. The only indication of this in the logs was `INFO > python.PythonRDD: stdin writer to Python finished early`. > > With batchSize=1 and persist, a new memory error came up in several > tasks, before the app was failed: > > 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in > thread Thread[stdin writer for python,5,main] > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOfRange(Arrays.java:2694) > at java.lang.String.<init>(String.java:203) > at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) > at java.nio.CharBuffer.toString(CharBuffer.java:1201) > at org.apache.hadoop.io.Text.decode(Text.java:350) > at org.apache.hadoop.io.Text.decode(Text.java:327) > at org.apache.hadoop.io.Text.toString(Text.java:254) > at > org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) > at > org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) > at > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) > > There are other exceptions, but I think they all stem from the above, > eg. org.apache.spark.SparkException: Error sending message to > BlockManagerMaster > > Let me know if there are other settings I should try, or if I should > try a newer snapshot. > > Thanks again! > > > On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: >> Hey Jim, >> >> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group >> multiple objects together before passing them between Java and Python, but >> this may be too high by default. Try passing batchSize=10 to your >> SparkContext constructor to lower it (the default is 1024). Or even >> batchSize=1 to match earlier versions. >> >> Matei >> >> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >> >>> Hi all, I'm wondering if there's any settings I can use to reduce the >>> memory needed by the PythonRDD when computing simple stats. I am >>> getting OutOfMemoryError exceptions while calculating count() on big, >>> but not absurd, records. It seems like PythonRDD is trying to keep >>> too many of these records in memory, when all that is needed is to >>> stream through them and count. Any tips for getting through this >>> workload? >>> >>> >>> Code: >>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data >>> >>> # the biggest individual text line is ~3MB >>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s): >>> (loads(y), loads(s))) >>> parsed.persist(StorageLevel.MEMORY_AND_DISK) >>> >>> parsed.count() >>> # will never finish: executor.Executor: Uncaught exception will FAIL >>> all executors >>> >>> Incidentally the whole app appears to be killed, but this error is not >>> propagated to the shell. >>> >>> Cluster: >>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB) >>> >>> Exception: >>> java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) >>> at >>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) >>> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>