Should add that I had to tweak the numbers a bit to keep above swap
threshold, but below the "Too many open files" error (`ulimit -n` is
32768).

On Wed, May 14, 2014 at 10:47 AM, Jim Blomo <jim.bl...@gmail.com> wrote:
> That worked amazingly well, thank you Matei!  Numbers that worked for
> me were 400 for the textFile()s, 1500 for the join()s.
>
> On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <matei.zaha...@gmail.com> 
> wrote:
>> Hey Jim, unfortunately external spilling is not implemented in Python right 
>> now. While it would be possible to update combineByKey to do smarter stuff 
>> here, one simple workaround you can try is to launch more map tasks (or more 
>> reduce tasks). To set the minimum number of map tasks, you can pass it as a 
>> second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
>> 1000)).
>>
>> Matei
>>
>> On May 12, 2014, at 5:47 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>
>>> Thanks, Aaron, this looks like a good solution!  Will be trying it out 
>>> shortly.
>>>
>>> I noticed that the S3 exception seem to occur more frequently when the
>>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>>> the assumption that it can fit an entire partition in memory when
>>> doing the combineLocally step.  I'm going to try to break this apart
>>> but will need some sort of heuristic options include looking at memory
>>> usage via the resource module and trying to keep below
>>> 'spark.executor.memory', or using batchSize to limit the number of
>>> entries in the dictionary.  Let me know if you have any opinions.
>>>
>>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilike...@gmail.com> wrote:
>>>> I'd just like to update this thread by pointing to the PR based on our
>>>> initial design: https://github.com/apache/spark/pull/640
>>>>
>>>> This solution is a little more general and avoids catching IOException
>>>> altogether. Long live exception propagation!
>>>>
>>>>
>>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> 
>>>> wrote:
>>>>>
>>>>> Hey Jim,
>>>>>
>>>>> This IOException thing is a general issue that we need to fix and your
>>>>> observation is spot-in. There is actually a JIRA for it here I created a 
>>>>> few
>>>>> days ago:
>>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>>
>>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>>> welcome a PR from you on this if you are interested.
>>>>>
>>>>> The first thought we had was to set a volatile flag when the reader sees
>>>>> an exception (indicating there was a failure in the task) and avoid
>>>>> swallowing the IOException in the writer if this happens. But I think 
>>>>> there
>>>>> is a race here where the writer sees the error first before the reader 
>>>>> knows
>>>>> what is going on.
>>>>>
>>>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>>>> somewhat complicated...
>>>>>
>>>>> - Patrick
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>>>
>>>>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>>>>> caused by a break in the connection to S3, from which the data was being
>>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>>> catching can potentially mask an exception for the data source, and that 
>>>>>> is
>>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>>>> setting them through Spark code.  Instead I added the patch below which
>>>>>> kills the worker on the exception.  This allows me to completely load the
>>>>>> data source after a few worker retries.
>>>>>>
>>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>>> sometimes expected from the client when using methods like take().  One
>>>>>> approach around this conflation is to create a new locally scoped 
>>>>>> exception
>>>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>>>> writing, then re-throw the new exception.  The worker thread could then
>>>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>>
>>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>>> list instead.  Best,
>>>>>>
>>>>>> Jim
>>>>>>
>>>>>> diff --git
>>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> index 0d71fdb..f31158c 100644
>>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>>             readerException = e
>>>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>>>>
>>>>>> +          case e: java.net.SocketException =>
>>>>>> +           // This can happen if a connection to the datasource, eg S3,
>>>>>> resets
>>>>>> +           // or is otherwise broken
>>>>>> +            readerException = e
>>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>>> +
>>>>>>           case e: IOException =>
>>>>>>             // This can happen for legitimate reasons if the Python code
>>>>>> stops returning data
>>>>>>             // before we are done passing elements through, e.g., for
>>>>>> take(). Just log a message to
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>>>>
>>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>>>> 343)
>>>>>>>
>>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com>
>>>>>>> wrote:
>>>>>>>> Okay, thanks. Do you have any info on how large your records and data
>>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>>
>>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>>> 0.9.0:
>>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>
>>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>>> Java heap space.  To me this indicates a memory leak since Spark
>>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs
>>>>>>>>> the
>>>>>>>>> application, unknown root cause
>>>>>>>>>
>>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>>>>>>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>>>>>>> problem:
>>>>>>>>>
>>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>>>>>>> early
>>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>       at
>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>>> <matei.zaha...@gmail.com> wrote:
>>>>>>>>>> Cool, thanks for the update. Have you tried running a branch with
>>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, 
>>>>>>>>>> what
>>>>>>>>>> memory leak issue are you referring to, is it separate from this? 
>>>>>>>>>> (Couldn't
>>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>>
>>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to
>>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, 
>>>>>>>>>> console
>>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is 
>>>>>>>>>> present in
>>>>>>>>>> "conf" on all workers.
>>>>>>>>>>
>>>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably
>>>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if 
>>>>>>>>>> records
>>>>>>>>>> are large, or something like that. How much heap are you giving to 
>>>>>>>>>> your
>>>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>>>>
>>>>>>>>>> Matei
>>>>>>>>>>
>>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think the problem I ran into in 0.9 is covered in
>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>>
>>>>>>>>>>> When I kill the python process, the stacktrace I gets indicates
>>>>>>>>>>> that
>>>>>>>>>>> this happens at initialization.  It looks like the initial write to
>>>>>>>>>>> the Python process does not go through, and then the iterator hangs
>>>>>>>>>>> waiting for output.  I haven't had luck turning on debugging for
>>>>>>>>>>> the
>>>>>>>>>>> executor process.  Still trying to learn the lgo4j properties that
>>>>>>>>>>> need to be set.
>>>>>>>>>>>
>>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>>
>>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>>>>>>>>> (crashed)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>>      at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>     at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>>>>>>>>>> Python
>>>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>>>>>>>>>> file.
>>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>>> <matei.zaha...@gmail.com> wrote:
>>>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>>>>>>>>>>>> none
>>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process
>>>>>>>>>>>>>> to get
>>>>>>>>>>>>>> things going again.  The only indication of this in the logs was
>>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>>>>>>>>>>>> several
>>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There are other exceptions, but I think they all stem from the
>>>>>>>>>>>>>> above,
>>>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know if there are other settings I should try, or if I
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>>>>>>>>>>>> <matei.zaha...@gmail.com> wrote:
>>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>>>>>>>>>>>>> makes it group multiple objects together before passing them 
>>>>>>>>>>>>>>> between Java
>>>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing 
>>>>>>>>>>>>>>> batchSize=10 to
>>>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 
>>>>>>>>>>>>>>> 1024). Or even
>>>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>> too many of these records in memory, when all that is needed
>>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>>> stream through them and count.  Any tips for getting through
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Reply via email to