Cool, thanks for the update. Have you tried running a branch with this fix 
(e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue 
are you referring to, is it separate from this? (Couldn’t find it earlier in 
the thread.)

To turn on debug logging, copy conf/log4j.properties.template to 
conf/log4j.properties and change the line log4j.rootCategory=INFO, console to 
log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
“conf” on all workers.

BTW I’ve managed to run PySpark with this fix on some reasonably large S3 data 
(multiple GB) and it was fine. It might happen only if records are large, or 
something like that. How much heap are you giving to your executors, and does 
it show that much in the web UI?

Matei

On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote:

> I think the problem I ran into in 0.9 is covered in
> https://issues.apache.org/jira/browse/SPARK-1323
> 
> When I kill the python process, the stacktrace I gets indicates that
> this happens at initialization.  It looks like the initial write to
> the Python process does not go through, and then the iterator hangs
> waiting for output.  I haven't had luck turning on debugging for the
> executor process.  Still trying to learn the lgo4j properties that
> need to be set.
> 
> No luck yet on tracking down the memory leak.
> 
> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>        at org.apache.spark.scheduler.Task.run(Task.scala:52)
>        at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>        at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>        at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:415)
>        at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>        at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>        at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>        at java.lang.Thread.run(Thread.java:724)
> 
> 
> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>> finished early` so frequently I wasn't able to load even a 1GB file.
>> Let me know if I can provide any other info!
>> 
>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>> wrote:
>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? 
>>> We'll try to look into these, seems like a serious error.
>>> 
>>> Matei
>>> 
>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>> 
>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>> 1.0.4" from GitHub on 2014-03-18.
>>>> 
>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>> have succeeded.
>>>> 
>>>> I can get this to work -- with manual interventions -- if I omit
>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>> of the 175 executors hung, and I had to kill the python process to get
>>>> things going again.  The only indication of this in the logs was `INFO
>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>> 
>>>> With batchSize=1 and persist, a new memory error came up in several
>>>> tasks, before the app was failed:
>>>> 
>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>> thread Thread[stdin writer for python,5,main]
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>       at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>       at java.lang.String.<init>(String.java:203)
>>>>       at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>       at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>       at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>       at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>       at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>       at 
>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>       at 
>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>       at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>       at 
>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> 
>>>> There are other exceptions, but I think they all stem from the above,
>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>> BlockManagerMaster
>>>> 
>>>> Let me know if there are other settings I should try, or if I should
>>>> try a newer snapshot.
>>>> 
>>>> Thanks again!
>>>> 
>>>> 
>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <matei.zaha...@gmail.com> 
>>>> wrote:
>>>>> Hey Jim,
>>>>> 
>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it 
>>>>> group multiple objects together before passing them between Java and 
>>>>> Python, but this may be too high by default. Try passing batchSize=10 to 
>>>>> your SparkContext constructor to lower it (the default is 1024). Or even 
>>>>> batchSize=1 to match earlier versions.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>> 
>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>> too many of these records in memory, when all that is needed is to
>>>>>> stream through them and count.  Any tips for getting through this
>>>>>> workload?
>>>>>> 
>>>>>> 
>>>>>> Code:
>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>> 
>>>>>> # the biggest individual text line is ~3MB
>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>> (loads(y), loads(s)))
>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>> 
>>>>>> parsed.count()
>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>> all executors
>>>>>> 
>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>> propagated to the shell.
>>>>>> 
>>>>>> Cluster:
>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>> 
>>>>>> Exception:
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>      at 
>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>      at 
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>      at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>> 
>>> 

Reply via email to