Re: 答复: 答复: RDD usage
Got it. Thanks for your help!! Chieh-Yen On Tue, Mar 25, 2014 at 6:51 PM, hequn cheng chenghe...@gmail.com wrote: Hi~I wrote a program to test.The non-idempotent compute function in foreach does change the value of RDD. It may looks a little crazy to do so since modify the RDD will make it impossible to keep RDD fault-tolerant in spark :) 2014-03-25 11:11 GMT+08:00 林武康 vboylin1...@gmail.com: Hi hequn, I dig into the source of spark a bit deeper, and I got some ideas, firstly, immutable is a feather of rdd but not a solid rule, there are ways to change it, for excample, a rdd with non-idempotent compute function, though it is really a bad design to make that function non-idempotent for uncontrollable side-effect. I agree with Mark that foreach can modify the elements of a rdd, but we should avoid this because it will effect all the rdds generate by this changed rdd , make the whole process inconsistent and unstable. Some rough opinions on the immutable feature of rdd, full discuss can make it more clear. Any ideas? -- 发件人: hequn cheng chenghe...@gmail.com 发送时间: 2014/3/25 10:40 收件人: user@spark.apache.org 主题: Re: 答复: RDD usage First question: If you save your modified RDD like this: points.foreach(p=p.y = another_value).collect() or points.foreach(p=p.y = another_value).saveAsTextFile(...) the modified RDD will be materialized and this will not use any work's memory. If you have more transformatins after the map(), the spark will pipelines all transformations and build a DAG. Very little memory will be used in this stage and the memory will be free soon. Only cache() will persist your RDD in memory for a long time. Second question: Once RDD be created, it can not be changed due to the immutable feature.You can only create a new RDD from the existing RDD or from file system. 2014-03-25 9:45 GMT+08:00 林武康 vboylin1...@gmail.com: Hi hequn, a relative question, is that mean the memory usage will doubled? And further more, if the compute function in a rdd is not idempotent, rdd will changed during the job running, is that right? -- 发件人: hequn cheng chenghe...@gmail.com 发送时间: 2014/3/25 9:35 收件人: user@spark.apache.org 主题: Re: RDD usage points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin
working with MultiTableInputFormat
I'm trying to create an RDD from multiple scans. I tried to set the configuration this way: Configuration config = HBaseConfiguration.create(); config.setStrings(MultiTableInputFormat.SCANS,scanStrings); And creating each scan string in the array scanStrings this way: Scan scan = new Scan(); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); scan.setFilter(filter); ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(out); scan.write(dos); String singleScanString = Base64.encodeBytes(out.toByteArray()); * When doing so I got an exception of No table was provided . From the class TableInputFormatBase * Even it didn't make any seance to me cause I'm providing the input table in the attribute SCAN_ATTRIBUTES_TABLE_NAME * I tried adding config.set(TableInputFormat.INPUT_TABLE, tableName); to my configuration * But then my spark mapper run into some kind of infinity loop. Do I miss anything? Can spark work with MultiTableInputFormat or only with TableInputFormat? Thanks Dana. - Intel Electronics Ltd. This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
Zip or map elements to create new RDD
Hi, I have an RDD of elements and want to create a new RDD by Zipping other RDD in order. result[RDD] with sequence of 10,20,30,40,50 ...elements. I am facing problems as index is not an RDD...its gives an error...Could anyone help me how we can zip it or map it inorder to obtain following result.(0,10),(1,20),(2,30),(3,40) I tried like this...but doesnt work...even zipWithIndex doesnt work becoz its scala method..not RDD method.. val index= List.range(0, result.count(),1) result.zip(index) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Do all classes involving RDD operation need to be registered?
From my limited knowledge, all classes involved with the RDD operations should be extending Serializable if you want Java serialization(default). However, if you want Kryo serialization, you can use conf.set(spark.serializer,org.apache.spark.serializer.KryoSerializer); If you also want to perform custom serialization, as in you want some variables to be set diferently/computed etc while deserialization, you would create a custom registrator, register your classes with it and call conf.set(spark.kryo.registrator,mypkg.MyKryoRegistrator); If I am missing something, please feel free to correct me. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Sat, Mar 29, 2014 at 1:40 AM, anny9699 anny9...@gmail.com wrote: Thanks a lot Ognen! It's not a fancy class that I wrote, and now I realized I neither extends Serializable or register with Kyro and that's why it is not working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439p3446.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Zip or map elements to create new RDD
zipWithIndex works on the git clone, not sure if its part of a released version. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Sat, Mar 29, 2014 at 5:27 PM, yh18190 yh18...@gmail.com wrote: Hi, I have an RDD of elements and want to create a new RDD by Zipping other RDD in order. result[RDD] with sequence of 10,20,30,40,50 ...elements. I am facing problems as index is not an RDD...its gives an error...Could anyone help me how we can zip it or map it inorder to obtain following result.(0,10),(1,20),(2,30),(3,40) I tried like this...but doesnt work...even zipWithIndex doesnt work becoz its scala method..not RDD method.. val index= List.range(0, result.count(),1) result.zip(index) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Zip or map elements to create new RDD
Thanks sonal.Is der anyother way like to map values with Increasing indexes...so that i can map(t=(i,t)) where value if 'i' increases after each map operation on element... Please help me ..in this aspect -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467p3470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to index each map operation????
Hi, I want to perform map operation on an RDD of elements such that resulting RDD is a key value pair(counter,value) For example var k:RDD[Int]=10,20,30,40,40,60... k.map(t=(i,t)) where 'i' value should be like a counter whose value increments after each mapoperation... Pleas help me.. I tried to wirte like this but didnt work out.. var i=0; k.map(t={ (i,t);i+=1; }) please correct me... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Do all classes involving RDD operation need to be registered?
Thanks so much Sonal! I am much clearer now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439p3472.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange behavior of RDD.cartesian
Is this spark 0.9.0? Try setting spark.shuffle.spill=false There was a hash collision bug that's fixed in 0.9.1 that might cause you to have too few results in that join. Sent from my mobile phone On Mar 28, 2014 8:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Weird, how exactly are you pulling out the sample? Do you have a small program that reproduces this? Matei On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I forgot to mention that I don't really use all of my data. Instead I use a sample extracted with randomSample. On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I notice that RDD.cartesian has a strange behavior with cached and uncached data. More precisely, I have a set of data that I load with objectFile *val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data)* Then I split it in two set depending on some criteria *val part1 = data.filter(_._2 matches view1)* *val part2 = data.filter(_._2 matches view2)* Finally, I compute the cartesian product of part1 and part2 *val pair = part1.cartesian(part2)* If every thing goes well I should have *pair.count == part1.count * part2.count* But this is not the case if I don't cache part1 and part2. What I was missing ? Does caching data mandatory in Spark ? Cheers, Jaonary
Re: Announcing Spark SQL
On Fri, Mar 28, 2014 at 9:53 PM, Rohit Rai ro...@tuplejump.com wrote: Upon discussion with couple of our clients, it seems the reason they would prefer using hive is that they have already invested a lot in it. Mostly in UDFs and HiveQL. 1. Are there any plans to develop the SQL Parser to handdle more complex queries like HiveQL? Can we just plugin a custom parser instead of bringing in the whole hive deps? We definitely want to have a more complete SQL parser without having to pull in all of hive. I think there are a couple of ways to do this. 1. Using a SQL-92 parser from something like optiq or writing our own 2. I haven't fully investigated the hive published artifacts, but if there is some way to depend on only the parser that would be great. If someone has resources to investigate using the Hive parser without needing to depend on all of hive this is a place where we would certainly welcome contributions. We could then consider making hiveql an option in a standard SQLContext. 2. Is there any way we can support UDFs in Catalyst without using Hive? It will bee fine if we don't support Hive UDFs as is and need minor porting effort. All of the execution support for native scala udfs is already there, and in fact when you use the DSL where clausehttp://people.apache.org/~pwendell/catalyst-docs/api/sql/core/index.html#org.apache.spark.sql.SchemaRDDyou are using this machinery. For Spark 1.1 we will find a more general way to expose this to users.
Re: KafkaInputDStream mapping of partitions to tasks
Hi Is there any workaround to this problem? I'm trying to implement a KafkaReceiver using the SimpleConsumer API [1] of Kafka and handle the partition assignment manually. The easiest setup in this case would be to bind the number of parallel jobs to the number of partitions in Kafka. This is basically what Samza [2] does. I have a few questions regarding this implementation: The getReceiver method looks like a good starting point to implement the manual partition assignment. Unfortunately it lacks documentation. As far as I understood from the API, the getReceiver method is called once and passes the received object (implementing the NetworkReceiver contract) to the worker nodes. Therefore the logic to assign partitions to each receiver has to be implemented within the receiver itself. I'm planning to implement the following and have some questions in this regard: 1. within getReceiver: setup a zookeeper queue with the assignment of partitions to parallel jobs and store the number of consumers needed - Is the number of parallel jobs accessible through ssc? 2. within onStart: poll the zookeeper queue and receive the partition number(s) to receive data from 3. within onStart: start a new thread for keepalive messages to zookeeper. In case a node goes down and a new receiver is started up again, the new receiver can lookup zookeper to find the consumer without recent keepalives and take it's place. Due to SPARK-1340 this might not even be possible yet. Is there any insight on why the receiver is not started again? Do you think the use of zookeeper in this scenario is a good approach or is there an easier way using an integrated spark functionality? Also, by using the SimpleConsumer API one has to manage the offset per consumer. The high level consumer API solves this problem with zookeeper. I'd take the same approach, if there's no easier way to handle this in spark. Best Nicolas [1] https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example [2] http://samza.incubator.apache.org/learn/documentation/0.7.0/container/task-runner.html On Fri, Mar 28, 2014 at 2:36 PM, Evgeniy Shishkin itparan...@gmail.comwrote: One more question, we are using memory_and_disk_ser_2 and i worried when those rdds on disk will be removed http://i.imgur.com/dbq5T6i.png unpersist is set to true, and rdds get purged from memory, but disk space just keep growing. On 28 Mar 2014, at 01:32, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, no one has reported this issue before. I just opened a JIRA on what I think is the main problem here https://spark-project.atlassian.net/browse/SPARK-1340 Some of the receivers dont get restarted. I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR that should fix this. Regarding the second problem, I have been thinking of adding flow control (i.e. limiting the rate of receiving) for a while, just havent gotten around to it. I added another JIRA for that for tracking this issue. https://spark-project.atlassian.net/browse/SPARK-1341 TD On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin itparan...@gmail.com wrote: On 28 Mar 2014, at 01:11, Scott Clasen scott.cla...@gmail.com wrote: Evgeniy Shishkin wrote So, at the bottom -- kafka input stream just does not work. That was the conclusion I was coming to as well. Are there open tickets around fixing this up? I am not aware of such. Actually nobody complained on spark+kafka before. So i thought it just works, and then we tried to build something on it and almost failed. I think that it is possible to steal/replicate how twitter storm works with kafka. They do manual partition assignment, at least this would help to balance load. There is another issue. ssc batch creates new rdds every batch duration, always, even it previous computation did not finish. But with kafka, we can consume more rdds later, after we finish previous rdds. That way it would be much much simpler to not get OOM'ed when starting from beginning, because we can consume many data from kafka during batch duration and then get oom. But we just can not start slow, can not limit how many to consume during batch. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pySpark memory usage
I've only tried 0.9, in which I ran into the `stdin writer to Python finished early` so frequently I wasn't able to load even a 1GB file. Let me know if I can provide any other info! On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error. Matei On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Matei. I am running Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 from GitHub on 2014-03-18. I tried batchSizes of 512, 10, and 1 and each got me further but none have succeeded. I can get this to work -- with manual interventions -- if I omit `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1. 5 of the 175 executors hung, and I had to kill the python process to get things going again. The only indication of this in the logs was `INFO python.PythonRDD: stdin writer to Python finished early`. With batchSize=1 and persist, a new memory error came up in several tasks, before the app was failed: 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in thread Thread[stdin writer for python,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) There are other exceptions, but I think they all stem from the above, eg. org.apache.spark.SparkException: Error sending message to BlockManagerMaster Let me know if there are other settings I should try, or if I should try a newer snapshot. Thanks again! On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jim, In Spark 0.9 we added a batchSize parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions. Matei On Mar 21, 2014, at 6:18 PM, Jim Blomo jim.bl...@gmail.com wrote: Hi all, I'm wondering if there's any settings I can use to reduce the memory needed by the PythonRDD when computing simple stats. I am getting OutOfMemoryError exceptions while calculating count() on big, but not absurd, records. It seems like PythonRDD is trying to keep too many of these records in memory, when all that is needed is to stream through them and count. Any tips for getting through this workload? Code: session = sc.textFile('s3://...json.gz') # ~54GB of compressed data # the biggest individual text line is ~3MB parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s): (loads(y), loads(s))) parsed.persist(StorageLevel.MEMORY_AND_DISK) parsed.count() # will never finish: executor.Executor: Uncaught exception will FAIL all executors Incidentally the whole app appears to be killed, but this error is not propagated to the shell. Cluster: 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB) Exception: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
Limiting number of reducers performance implications
Hi everyone, I'm using Spark on machines where I can't change the maximum number of open files. As a result, I'm limiting the number of reducers to 500. I'm also only using a single machine that has 32 cores and emulating a cluster by running 4 worker daemons with 8 cores (maximum) each. What I'm noticing is that as I allocate more cores to my Spark job, my job is not being completed much more quickly, and, the CPUs aren't being heavily utilized. When I attempt to use all 32 cores, monitoring via mpstat is showing a decent chunk of CPU idleness (70%-80% idle). My job consists primarily of reduceByKey and groupByKey operations, with flatMaps, maps, and filters in between. I was wondering: *what exactly does the number of reducers do?* Clearly, if there are 32 cores, setting the number of reducers to 500 should be a workload that keeps all 32 cores busy. Or am I misunderstanding the concept of what a reducer is? Thanks, -Matt Ceah
Cross validation is missing in machine learning examples
Hi, I notices spark machine learning examples use training data to validate regression models, For instance, in linear regressionhttp://spark.apache.org/docs/0.9.0/mllib-guide.htmlexample: // Evaluate model on training examples and compute training errorval valuesAndPreds = parsedData.map { point = val prediction = model.predict(point.features) (point.label, prediction)} ... Here training data was used to validated a model which was created from the very same training data. This is just a bias estimation, and cross validation http://en.wikipedia.org/wiki/Cross-validation_%28statistics%29is missing here. In order to cross validate, we need to partition the data into in-sample for training, and out-of-sample for validation. Please correct me if this does not apply to ML algorithms implemented in spark.
Re: pySpark memory usage
I think the problem I ran into in 0.9 is covered in https://issues.apache.org/jira/browse/SPARK-1323 When I kill the python process, the stacktrace I gets indicates that this happens at initialization. It looks like the initial write to the Python process does not go through, and then the iterator hangs waiting for output. I haven't had luck turning on debugging for the executor process. Still trying to learn the lgo4j properties that need to be set. No luck yet on tracking down the memory leak. 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231) at org.apache.spark.rdd.RDD.iterator(RDD.scala:222) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo jim.bl...@gmail.com wrote: I've only tried 0.9, in which I ran into the `stdin writer to Python finished early` so frequently I wasn't able to load even a 1GB file. Let me know if I can provide any other info! On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error. Matei On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Matei. I am running Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 from GitHub on 2014-03-18. I tried batchSizes of 512, 10, and 1 and each got me further but none have succeeded. I can get this to work -- with manual interventions -- if I omit `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1. 5 of the 175 executors hung, and I had to kill the python process to get things going again. The only indication of this in the logs was `INFO python.PythonRDD: stdin writer to Python finished early`. With batchSize=1 and persist, a new memory error came up in several tasks, before the app was failed: 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in thread Thread[stdin writer for python,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) There are other exceptions, but I think they all stem from the above, eg. org.apache.spark.SparkException: Error sending message to BlockManagerMaster Let me know if there are other settings I should try, or if I should try a newer snapshot. Thanks again! On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jim, In Spark 0.9 we added a batchSize parameter to PySpark that makes it group multiple objects together before passing them between