I'd just like to update this thread by pointing to the PR based on our initial design: https://github.com/apache/spark/pull/640
This solution is a little more general and avoids catching IOException altogether. Long live exception propagation! On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> wrote: > Hey Jim, > > This IOException thing is a general issue that we need to fix and your > observation is spot-in. There is actually a JIRA for it here I created a > few days ago: > https://issues.apache.org/jira/browse/SPARK-1579 > > Aaron is assigned on that one but not actively working on it, so we'd > welcome a PR from you on this if you are interested. > > The first thought we had was to set a volatile flag when the reader sees > an exception (indicating there was a failure in the task) and avoid > swallowing the IOException in the writer if this happens. But I think there > is a race here where the writer sees the error first before the reader > knows what is going on. > > Anyways maybe if you have a simpler solution you could sketch it out in > the JIRA and we could talk over there. The current proposal in the JIRA is > somewhat complicated... > > - Patrick > > > > > > > On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote: > >> FYI, it looks like this "stdin writer to Python finished early" error was >> caused by a break in the connection to S3, from which the data was being >> pulled. A recent commit to >> PythonRDD<https://github.com/apache/spark/commit/a967b005c8937a3053e215c952d2172ee3dc300d#commitcomment-6114780>noted >> that the current exception catching can potentially mask an exception >> for the data source, and that is indeed what I see happening. The >> underlying libraries (jets3t and httpclient) do have retry capabilities, >> but I don't see a great way of setting them through Spark code. Instead I >> added the patch below which kills the worker on the exception. This allows >> me to completely load the data source after a few worker retries. >> >> Unfortunately, java.net.SocketException is the same error that is >> sometimes expected from the client when using methods like take(). One >> approach around this conflation is to create a new locally scoped exception >> class, eg. WriterException, catch java.net.SocketException during output >> writing, then re-throw the new exception. The worker thread could then >> distinguish between the reasons java.net.SocketException might be thrown. >> Perhaps there is a more elegant way to do this in Scala, though? >> >> Let me know if I should open a ticket or discuss this on the developers >> list instead. Best, >> >> Jim >> >> diff --git >> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >> index 0d71fdb..f31158c 100644 >> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( >> readerException = e >> Try(worker.shutdownOutput()) // kill Python worker process >> >> + case e: java.net.SocketException => >> + // This can happen if a connection to the datasource, eg S3, >> resets >> + // or is otherwise broken >> + readerException = e >> + Try(worker.shutdownOutput()) // kill Python worker process >> + >> case e: IOException => >> // This can happen for legitimate reasons if the Python code >> stops returning data >> // before we are done passing elements through, e.g., for >> take(). Just log a message to >> >> >> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >> >>> This dataset is uncompressed text at ~54GB. stats() returns (count: >>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: >>> 343) >>> >>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com> >>> wrote: >>> > Okay, thanks. Do you have any info on how large your records and data >>> file are? I'd like to reproduce and fix this. >>> > >>> > Matei >>> > >>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>> > >>> >> Hi Matei, thanks for working with me to find these issues. >>> >> >>> >> To summarize, the issues I've seen are: >>> >> 0.9.0: >>> >> - https://issues.apache.org/jira/browse/SPARK-1323 >>> >> >>> >> SNAPSHOT 2014-03-18: >>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError: >>> >> Java heap space. To me this indicates a memory leak since Spark >>> >> should simply be counting records of size < 3MB >>> >> - Without persist(), "stdin writer to Python finished early" hangs the >>> >> application, unknown root cause >>> >> >>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with >>> >> debugging turned on. This gives me the stacktrace on the new "stdin" >>> >> problem: >>> >> >>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished >>> early >>> >> java.net.SocketException: Connection reset >>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196) >>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122) >>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) >>> >> at >>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) >>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509) >>> >> at >>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) >>> >> at >>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) >>> >> at >>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102) >>> >> at >>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>> >> at >>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>> >> at >>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) >>> >> at >>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) >>> >> at java.io.FilterInputStream.read(FilterInputStream.java:133) >>> >> at >>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) >>> >> at >>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76) >>> >> at >>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136) >>> >> at >>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98) >>> >> at >>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>> >> at >>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>> >> at java.io.DataInputStream.read(DataInputStream.java:100) >>> >> at >>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) >>> >> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) >>> >> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) >>> >> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192) >>> >> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175) >>> >> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> >> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) >>> >> at >>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> >> at >>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) >>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >> at >>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> >> at >>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>> >> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>> >> >>> >> >>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia < >>> matei.zaha...@gmail.com> wrote: >>> >>> Cool, thanks for the update. Have you tried running a branch with >>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what >>> memory leak issue are you referring to, is it separate from this? (Couldn't >>> find it earlier in the thread.) >>> >>> >>> >>> To turn on debug logging, copy conf/log4j.properties.template to >>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console >>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present >>> in "conf" on all workers. >>> >>> >>> >>> BTW I've managed to run PySpark with this fix on some reasonably >>> large S3 data (multiple GB) and it was fine. It might happen only if >>> records are large, or something like that. How much heap are you giving to >>> your executors, and does it show that much in the web UI? >>> >>> >>> >>> Matei >>> >>> >>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>> >>> >>> >>>> I think the problem I ran into in 0.9 is covered in >>> >>>> https://issues.apache.org/jira/browse/SPARK-1323 >>> >>>> >>> >>>> When I kill the python process, the stacktrace I gets indicates that >>> >>>> this happens at initialization. It looks like the initial write to >>> >>>> the Python process does not go through, and then the iterator hangs >>> >>>> waiting for output. I haven't had luck turning on debugging for the >>> >>>> executor process. Still trying to learn the lgo4j properties that >>> >>>> need to be set. >>> >>>> >>> >>>> No luck yet on tracking down the memory leak. >>> >>>> >>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11 >>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly >>> (crashed) >>> >>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168) >>> >>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) >>> >>>> at >>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113) >>> >>>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231) >>> >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:222) >>> >>>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >>> >>>> at org.apache.spark.scheduler.Task.run(Task.scala:52) >>> >>>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212) >>> >>>> at >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) >>> >>>> at >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>> >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> >>>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>> >>>> at >>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) >>> >>>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>> >>>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>>> at java.lang.Thread.run(Thread.java:724) >>> >>>> >>> >>>> >>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> >>> wrote: >>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to >>> Python >>> >>>>> finished early` so frequently I wasn't able to load even a 1GB >>> file. >>> >>>>> Let me know if I can provide any other info! >>> >>>>> >>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia < >>> matei.zaha...@gmail.com> wrote: >>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or >>> 0.8)? We'll try to look into these, seems like a serious error. >>> >>>>>> >>> >>>>>> Matei >>> >>>>>> >>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> >>> wrote: >>> >>>>>> >>> >>>>>>> Thanks, Matei. I am running "Spark 1.0.0-SNAPSHOT built for >>> Hadoop >>> >>>>>>> 1.0.4" from GitHub on 2014-03-18. >>> >>>>>>> >>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but >>> none >>> >>>>>>> have succeeded. >>> >>>>>>> >>> >>>>>>> I can get this to work -- with manual interventions -- if I omit >>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set >>> batchSize=1. 5 >>> >>>>>>> of the 175 executors hung, and I had to kill the python process >>> to get >>> >>>>>>> things going again. The only indication of this in the logs was >>> `INFO >>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`. >>> >>>>>>> >>> >>>>>>> With batchSize=1 and persist, a new memory error came up in >>> several >>> >>>>>>> tasks, before the app was failed: >>> >>>>>>> >>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in >>> >>>>>>> thread Thread[stdin writer for python,5,main] >>> >>>>>>> java.lang.OutOfMemoryError: Java heap space >>> >>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>> >>>>>>> at java.lang.String.<init>(String.java:203) >>> >>>>>>> at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) >>> >>>>>>> at java.nio.CharBuffer.toString(CharBuffer.java:1201) >>> >>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:350) >>> >>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:327) >>> >>>>>>> at org.apache.hadoop.io.Text.toString(Text.java:254) >>> >>>>>>> at >>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>> >>>>>>> at >>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>> >>>>>>> at >>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> >>>>>>> at >>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357) >>> >>>>>>> at >>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >>>>>>> at >>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> >>>>>>> at >>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>> >>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>> >>>>>>> >>> >>>>>>> There are other exceptions, but I think they all stem from the >>> above, >>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to >>> >>>>>>> BlockManagerMaster >>> >>>>>>> >>> >>>>>>> Let me know if there are other settings I should try, or if I >>> should >>> >>>>>>> try a newer snapshot. >>> >>>>>>> >>> >>>>>>> Thanks again! >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia < >>> matei.zaha...@gmail.com> wrote: >>> >>>>>>>> Hey Jim, >>> >>>>>>>> >>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that >>> makes it group multiple objects together before passing them between Java >>> and Python, but this may be too high by default. Try passing batchSize=10 >>> to your SparkContext constructor to lower it (the default is 1024). Or even >>> batchSize=1 to match earlier versions. >>> >>>>>>>> >>> >>>>>>>> Matei >>> >>>>>>>> >>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> >>> wrote: >>> >>>>>>>> >>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to >>> reduce the >>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats. I >>> am >>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count() >>> on big, >>> >>>>>>>>> but not absurd, records. It seems like PythonRDD is trying to >>> keep >>> >>>>>>>>> too many of these records in memory, when all that is needed >>> is to >>> >>>>>>>>> stream through them and count. Any tips for getting through >>> this >>> >>>>>>>>> workload? >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> Code: >>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed >>> data >>> >>>>>>>>> >>> >>>>>>>>> # the biggest individual text line is ~3MB >>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda >>> (y,s): >>> >>>>>>>>> (loads(y), loads(s))) >>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK) >>> >>>>>>>>> >>> >>>>>>>>> parsed.count() >>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception >>> will FAIL >>> >>>>>>>>> all executors >>> >>>>>>>>> >>> >>>>>>>>> Incidentally the whole app appears to be killed, but this >>> error is not >>> >>>>>>>>> propagated to the shell. >>> >>>>>>>>> >>> >>>>>>>>> Cluster: >>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, >>> spark.executor.memory=10GB) >>> >>>>>>>>> >>> >>>>>>>>> Exception: >>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>> >>>>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) >>> >>>>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) >>> >>>>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) >>> >>>>>>>>> at >>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >>>>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) >>> >>>>>>>>> at >>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>> >>>>>>>>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>> >>>>>>>>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) >>> >>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) >>> >>>>>>>>> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>> >>>>>>>> >>> >>>>>> >>> >>> >>> > >>> >> >> >