I'd just like to update this thread by pointing to the PR based on our
initial design: https://github.com/apache/spark/pull/640

This solution is a little more general and avoids catching IOException
altogether. Long live exception propagation!


On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> wrote:

> Hey Jim,
>
> This IOException thing is a general issue that we need to fix and your
> observation is spot-in. There is actually a JIRA for it here I created a
> few days ago:
> https://issues.apache.org/jira/browse/SPARK-1579
>
> Aaron is assigned on that one but not actively working on it, so we'd
> welcome a PR from you on this if you are interested.
>
> The first thought we had was to set a volatile flag when the reader sees
> an exception (indicating there was a failure in the task) and avoid
> swallowing the IOException in the writer if this happens. But I think there
> is a race here where the writer sees the error first before the reader
> knows what is going on.
>
> Anyways maybe if you have a simpler solution you could sketch it out in
> the JIRA and we could talk over there. The current proposal in the JIRA is
> somewhat complicated...
>
> - Patrick
>
>
>
>
>
>
> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>
>> FYI, it looks like this "stdin writer to Python finished early" error was
>> caused by a break in the connection to S3, from which the data was being
>> pulled.  A recent commit to 
>> PythonRDD<https://github.com/apache/spark/commit/a967b005c8937a3053e215c952d2172ee3dc300d#commitcomment-6114780>noted
>>  that the current exception catching can potentially mask an exception
>> for the data source, and that is indeed what I see happening.  The
>> underlying libraries (jets3t and httpclient) do have retry capabilities,
>> but I don't see a great way of setting them through Spark code.  Instead I
>> added the patch below which kills the worker on the exception.  This allows
>> me to completely load the data source after a few worker retries.
>>
>> Unfortunately, java.net.SocketException is the same error that is
>> sometimes expected from the client when using methods like take().  One
>> approach around this conflation is to create a new locally scoped exception
>> class, eg. WriterException, catch java.net.SocketException during output
>> writing, then re-throw the new exception.  The worker thread could then
>> distinguish between the reasons java.net.SocketException might be thrown.
>>  Perhaps there is a more elegant way to do this in Scala, though?
>>
>> Let me know if I should open a ticket or discuss this on the developers
>> list instead.  Best,
>>
>> Jim
>>
>> diff --git
>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> index 0d71fdb..f31158c 100644
>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>              readerException = e
>>              Try(worker.shutdownOutput()) // kill Python worker process
>>
>> +          case e: java.net.SocketException =>
>> +           // This can happen if a connection to the datasource, eg S3,
>> resets
>> +           // or is otherwise broken
>> +            readerException = e
>> +            Try(worker.shutdownOutput()) // kill Python worker process
>> +
>>            case e: IOException =>
>>              // This can happen for legitimate reasons if the Python code
>> stops returning data
>>              // before we are done passing elements through, e.g., for
>> take(). Just log a message to
>>
>>
>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>
>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>> 343)
>>>
>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com>
>>> wrote:
>>> > Okay, thanks. Do you have any info on how large your records and data
>>> file are? I'd like to reproduce and fix this.
>>> >
>>> > Matei
>>> >
>>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>> >
>>> >> Hi Matei, thanks for working with me to find these issues.
>>> >>
>>> >> To summarize, the issues I've seen are:
>>> >> 0.9.0:
>>> >> - https://issues.apache.org/jira/browse/SPARK-1323
>>> >>
>>> >> SNAPSHOT 2014-03-18:
>>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>> >> Java heap space.  To me this indicates a memory leak since Spark
>>> >> should simply be counting records of size < 3MB
>>> >> - Without persist(), "stdin writer to Python finished early" hangs the
>>> >> application, unknown root cause
>>> >>
>>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>> >> problem:
>>> >>
>>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>> early
>>> >> java.net.SocketException: Connection reset
>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >>        at
>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >>        at
>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >>        at
>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>> >>        at
>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >>        at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>> >>        at
>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> >>        at
>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>> >>        at
>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>> >>        at
>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>> >>        at
>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>> >>        at
>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>> >>        at
>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>> >>        at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>> >>        at
>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
>>> >>        at
>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>> >>        at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>> >>        at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>> >>        at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>> >>        at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>> >>        at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> >>        at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>> >>        at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> >>        at
>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>        at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>        at
>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>> >>        at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>
>>> >>
>>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <
>>> matei.zaha...@gmail.com> wrote:
>>> >>> Cool, thanks for the update. Have you tried running a branch with
>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>> find it earlier in the thread.)
>>> >>>
>>> >>> To turn on debug logging, copy conf/log4j.properties.template to
>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present
>>> in "conf" on all workers.
>>> >>>
>>> >>> BTW I've managed to run PySpark with this fix on some reasonably
>>> large S3 data (multiple GB) and it was fine. It might happen only if
>>> records are large, or something like that. How much heap are you giving to
>>> your executors, and does it show that much in the web UI?
>>> >>>
>>> >>> Matei
>>> >>>
>>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote:
>>> >>>
>>> >>>> I think the problem I ran into in 0.9 is covered in
>>> >>>> https://issues.apache.org/jira/browse/SPARK-1323
>>> >>>>
>>> >>>> When I kill the python process, the stacktrace I gets indicates that
>>> >>>> this happens at initialization.  It looks like the initial write to
>>> >>>> the Python process does not go through, and then the iterator hangs
>>> >>>> waiting for output.  I haven't had luck turning on debugging for the
>>> >>>> executor process.  Still trying to learn the lgo4j properties that
>>> >>>> need to be set.
>>> >>>>
>>> >>>> No luck yet on tracking down the memory leak.
>>> >>>>
>>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>> (crashed)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>> >>>>       at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>> >>>>       at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>> >>>>       at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>> >>>>       at java.security.AccessController.doPrivileged(Native Method)
>>> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>> >>>>       at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>> >>>>       at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>> >>>>       at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> >>>>      at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> >>>>       at java.lang.Thread.run(Thread.java:724)
>>> >>>>
>>> >>>>
>>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com>
>>> wrote:
>>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>> Python
>>> >>>>> finished early` so frequently I wasn't able to load even a 1GB
>>> file.
>>> >>>>> Let me know if I can provide any other info!
>>> >>>>>
>>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <
>>> matei.zaha...@gmail.com> wrote:
>>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>> 0.8)? We'll try to look into these, seems like a serious error.
>>> >>>>>>
>>> >>>>>> Matei
>>> >>>>>>
>>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com>
>>> wrote:
>>> >>>>>>
>>> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>> Hadoop
>>> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>> >>>>>>>
>>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>> none
>>> >>>>>>> have succeeded.
>>> >>>>>>>
>>> >>>>>>> I can get this to work -- with manual interventions -- if I omit
>>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>> batchSize=1.  5
>>> >>>>>>> of the 175 executors hung, and I had to kill the python process
>>> to get
>>> >>>>>>> things going again.  The only indication of this in the logs was
>>> `INFO
>>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>> >>>>>>>
>>> >>>>>>> With batchSize=1 and persist, a new memory error came up in
>>> several
>>> >>>>>>> tasks, before the app was failed:
>>> >>>>>>>
>>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>> >>>>>>> thread Thread[stdin writer for python,5,main]
>>> >>>>>>> java.lang.OutOfMemoryError: Java heap space
>>> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>> >>>>>>>      at java.lang.String.<init>(String.java:203)
>>> >>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>> >>>>>>>      at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>> >>>>>>>      at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>> >>>>>>>      at
>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> >>>>>>>      at
>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>> >>>>>>>      at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>>>>>>      at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>>>>>>      at
>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>> >>>>>>>      at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>>>>>>
>>> >>>>>>> There are other exceptions, but I think they all stem from the
>>> above,
>>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>> >>>>>>> BlockManagerMaster
>>> >>>>>>>
>>> >>>>>>> Let me know if there are other settings I should try, or if I
>>> should
>>> >>>>>>> try a newer snapshot.
>>> >>>>>>>
>>> >>>>>>> Thanks again!
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <
>>> matei.zaha...@gmail.com> wrote:
>>> >>>>>>>> Hey Jim,
>>> >>>>>>>>
>>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>> makes it group multiple objects together before passing them between Java
>>> and Python, but this may be too high by default. Try passing batchSize=10
>>> to your SparkContext constructor to lower it (the default is 1024). Or even
>>> batchSize=1 to match earlier versions.
>>> >>>>>>>>
>>> >>>>>>>> Matei
>>> >>>>>>>>
>>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com>
>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>> reduce the
>>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>> am
>>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>> on big,
>>> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>> keep
>>> >>>>>>>>> too many of these records in memory, when all that is needed
>>> is to
>>> >>>>>>>>> stream through them and count.  Any tips for getting through
>>> this
>>> >>>>>>>>> workload?
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> Code:
>>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>> data
>>> >>>>>>>>>
>>> >>>>>>>>> # the biggest individual text line is ~3MB
>>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>> (y,s):
>>> >>>>>>>>> (loads(y), loads(s)))
>>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>> >>>>>>>>>
>>> >>>>>>>>> parsed.count()
>>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>> will FAIL
>>> >>>>>>>>> all executors
>>> >>>>>>>>>
>>> >>>>>>>>> Incidentally the whole app appears to be killed, but this
>>> error is not
>>> >>>>>>>>> propagated to the shell.
>>> >>>>>>>>>
>>> >>>>>>>>> Cluster:
>>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>> spark.executor.memory=10GB)
>>> >>>>>>>>>
>>> >>>>>>>>> Exception:
>>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>> >>>>>>>>>     at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>> >>>>>>>>>     at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> >>>>>>>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> >>>>>>>>>     at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>>>>>>>
>>> >>>>>>
>>> >>>
>>> >
>>>
>>
>>
>

Reply via email to