Okay, thanks. Do you have any info on how large your records and data file are? 
I’d like to reproduce and fix this.

Matei

On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote:

> Hi Matei, thanks for working with me to find these issues.
> 
> To summarize, the issues I've seen are:
> 0.9.0:
> - https://issues.apache.org/jira/browse/SPARK-1323
> 
> SNAPSHOT 2014-03-18:
> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
> Java heap space.  To me this indicates a memory leak since Spark
> should simply be counting records of size < 3MB
> - Without persist(), "stdin writer to Python finished early" hangs the
> application, unknown root cause
> 
> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
> debugging turned on.  This gives me the stacktrace on the new "stdin"
> problem:
> 
> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
> java.net.SocketException: Connection reset
>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>        at 
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>        at 
> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>        at 
> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>        at 
> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>        at 
> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>        at 
> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>        at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>        at java.io.DataInputStream.read(DataInputStream.java:100)
>        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>        at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>        at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>        at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>        at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>        at 
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> 
> 
> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:
>> Cool, thanks for the update. Have you tried running a branch with this fix 
>> (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
>> issue are you referring to, is it separate from this? (Couldn't find it 
>> earlier in the thread.)
>> 
>> To turn on debug logging, copy conf/log4j.properties.template to 
>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console 
>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
>> "conf" on all workers.
>> 
>> BTW I've managed to run PySpark with this fix on some reasonably large S3 
>> data (multiple GB) and it was fine. It might happen only if records are 
>> large, or something like that. How much heap are you giving to your 
>> executors, and does it show that much in the web UI?
>> 
>> Matei
>> 
>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>> 
>>> I think the problem I ran into in 0.9 is covered in
>>> https://issues.apache.org/jira/browse/SPARK-1323
>>> 
>>> When I kill the python process, the stacktrace I gets indicates that
>>> this happens at initialization.  It looks like the initial write to
>>> the Python process does not go through, and then the iterator hangs
>>> waiting for output.  I haven't had luck turning on debugging for the
>>> executor process.  Still trying to learn the lgo4j properties that
>>> need to be set.
>>> 
>>> No luck yet on tracking down the memory leak.
>>> 
>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>>>       at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>       at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>       at 
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>       at 
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>       at 
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>       at java.security.AccessController.doPrivileged(Native Method)
>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>>       at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>       at 
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>       at 
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>       at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>      at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>       at java.lang.Thread.run(Thread.java:724)
>>> 
>>> 
>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>>>> finished early` so frequently I wasn't able to load even a 1GB file.
>>>> Let me know if I can provide any other info!
>>>> 
>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>>>> wrote:
>>>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? 
>>>>> We'll try to look into these, seems like a serious error.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>> 
>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>> 
>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>>>> have succeeded.
>>>>>> 
>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>>>> of the 175 executors hung, and I had to kill the python process to get
>>>>>> things going again.  The only indication of this in the logs was `INFO
>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>> 
>>>>>> With batchSize=1 and persist, a new memory error came up in several
>>>>>> tasks, before the app was failed:
>>>>>> 
>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>      at java.lang.String.<init>(String.java:203)
>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>      at 
>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>      at 
>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>      at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>      at 
>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>> 
>>>>>> There are other exceptions, but I think they all stem from the above,
>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>> BlockManagerMaster
>>>>>> 
>>>>>> Let me know if there are other settings I should try, or if I should
>>>>>> try a newer snapshot.
>>>>>> 
>>>>>> Thanks again!
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <matei.zaha...@gmail.com> 
>>>>>> wrote:
>>>>>>> Hey Jim,
>>>>>>> 
>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it 
>>>>>>> group multiple objects together before passing them between Java and 
>>>>>>> Python, but this may be too high by default. Try passing batchSize=10 
>>>>>>> to your SparkContext constructor to lower it (the default is 1024). Or 
>>>>>>> even batchSize=1 to match earlier versions.
>>>>>>> 
>>>>>>> Matei
>>>>>>> 
>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>>>> too many of these records in memory, when all that is needed is to
>>>>>>>> stream through them and count.  Any tips for getting through this
>>>>>>>> workload?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Code:
>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>>>> 
>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>>>> (loads(y), loads(s)))
>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>> 
>>>>>>>> parsed.count()
>>>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>>>> all executors
>>>>>>>> 
>>>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>>>> propagated to the shell.
>>>>>>>> 
>>>>>>>> Cluster:
>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>>>> 
>>>>>>>> Exception:
>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>     at 
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>     at 
>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>     at 
>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>     at 
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>> 
>>>>> 
>> 

Reply via email to