Re: 答复: 答复: RDD usage

2014-03-29 Thread Chieh-Yen
Got it.
Thanks for your help!!

Chieh-Yen


On Tue, Mar 25, 2014 at 6:51 PM, hequn cheng chenghe...@gmail.com wrote:

 Hi~I wrote a program to test.The non-idempotent compute function in
 foreach does change the value of RDD. It may looks a little crazy to do so
 since modify the RDD will make it impossible to keep RDD fault-tolerant
 in spark :)



 2014-03-25 11:11 GMT+08:00 林武康 vboylin1...@gmail.com:

  Hi hequn, I dig into the source of spark a bit deeper, and I got some
 ideas, firstly, immutable is a feather of rdd but not a solid rule, there
 are ways to change it, for excample, a rdd with non-idempotent compute
 function, though it is really a bad design to make that function
 non-idempotent for uncontrollable side-effect. I agree with Mark that
 foreach can modify the elements of a rdd, but we should avoid this because
 it will effect all the rdds generate by this changed rdd , make the whole
 process inconsistent and unstable.

 Some rough opinions on the immutable feature of rdd, full discuss can
 make it more clear. Any ideas?
  --
 发件人: hequn cheng chenghe...@gmail.com
 发送时间: 2014/3/25 10:40

 收件人: user@spark.apache.org
 主题: Re: 答复: RDD usage

  First question:
 If you save your modified RDD like this:
 points.foreach(p=p.y = another_value).collect() or
 points.foreach(p=p.y = another_value).saveAsTextFile(...)
 the modified RDD will be materialized and this will not use any work's
 memory.
 If you have more transformatins after the map(), the spark will pipelines
 all transformations and build a DAG. Very little memory will be used in
 this stage and the memory will be free soon.
 Only cache() will persist your RDD in memory for a long time.
 Second question:
 Once RDD be created, it can not be changed due to the immutable
 feature.You can only create a new RDD from the existing RDD or from file
 system.


 2014-03-25 9:45 GMT+08:00 林武康 vboylin1...@gmail.com:

  Hi hequn, a relative question, is that mean the memory usage will
 doubled? And further more, if the compute function in a rdd is not
 idempotent, rdd will changed during the job running, is that right?
  --
 发件人: hequn cheng chenghe...@gmail.com
 发送时间: 2014/3/25 9:35
 收件人: user@spark.apache.org
 主题: Re: RDD usage

  points.foreach(p=p.y = another_value) will return a new modified RDD.


 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw:

  Dear all,

 I have a question about the usage of RDD.
 I implemented a class called AppDataPoint, it looks like:

 case class AppDataPoint(input_y : Double, input_x : Array[Double])
 extends Serializable {
   var y : Double = input_y
   var x : Array[Double] = input_x
   ..
 }
 Furthermore, I created the RDD by the following function.

 def parsePoint(line: String): AppDataPoint = {
   /* Some related works for parsing */
   ..
 }

 Assume the RDD called points:

 val lines = sc.textFile(inputPath, numPartition)
 var points = lines.map(parsePoint _).cache()

 The question is that, I tried to modify the value of this RDD, the
 operation is:

 points.foreach(p=p.y = another_value)

 The operation is workable.
 There doesn't have any warning or error message showed by the system
 and the results are right.
 I wonder that if the modification for RDD is a correct and in fact
 workable design.
 The usage web said that the RDD is immutable, is there any suggestion?

 Thanks a lot.

 Chieh-Yen Lin







working with MultiTableInputFormat

2014-03-29 Thread Livni, Dana
I'm trying to create an RDD from multiple scans.
I tried to set the configuration this way:

Configuration config = HBaseConfiguration.create();
config.setStrings(MultiTableInputFormat.SCANS,scanStrings);

And creating each scan string in the array scanStrings  this way:

Scan scan = new Scan();
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
scan.setFilter(filter);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(out);
scan.write(dos);
String singleScanString = Base64.encodeBytes(out.toByteArray());
* When doing so I got an exception of  No table was provided . From 
the class TableInputFormatBase
* Even it didn't make any seance to me cause I'm providing the input 
table in the attribute SCAN_ATTRIBUTES_TABLE_NAME
* I tried adding config.set(TableInputFormat.INPUT_TABLE, tableName); 
to my configuration
* But then my spark mapper run into some kind of infinity loop.
Do I miss anything?
Can spark work with MultiTableInputFormat or only with TableInputFormat?

Thanks Dana.

-
Intel Electronics Ltd.

This e-mail and any attachments may contain confidential material for
the sole use of the intended recipient(s). Any review or distribution
by others is strictly prohibited. If you are not the intended
recipient, please contact the sender and delete all copies.


Zip or map elements to create new RDD

2014-03-29 Thread yh18190
Hi,
I have an RDD of elements and want to create a new RDD by Zipping other RDD
in order.
result[RDD] with sequence of 10,20,30,40,50 ...elements.
I am facing problems as index is not an RDD...its gives an error...Could
anyone help me how we can zip it or map it inorder to obtain following
result.(0,10),(1,20),(2,30),(3,40)
I tried like this...but doesnt work...even zipWithIndex doesnt work becoz
its scala method..not RDD method..

 val index= List.range(0, result.count(),1)
result.zip(index)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Do all classes involving RDD operation need to be registered?

2014-03-29 Thread Sonal Goyal
From my limited knowledge, all classes involved with the RDD operations
should be extending Serializable if you want Java serialization(default).

However, if you want Kryo serialization, you can
use conf.set(spark.serializer,org.apache.spark.serializer.KryoSerializer);
If you also want to perform custom serialization, as in you want some
variables to be set diferently/computed etc while deserialization, you
would create a custom registrator, register your classes with it and
call conf.set(spark.kryo.registrator,mypkg.MyKryoRegistrator);

If I am missing something, please feel free to correct me.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal




On Sat, Mar 29, 2014 at 1:40 AM, anny9699 anny9...@gmail.com wrote:

 Thanks a lot Ognen!

 It's not a fancy class that I wrote, and now I realized I neither extends
 Serializable or register with Kyro and that's why it is not working.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439p3446.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Zip or map elements to create new RDD

2014-03-29 Thread Sonal Goyal
zipWithIndex works on the git clone, not sure if its part of a released
version.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala



Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal




On Sat, Mar 29, 2014 at 5:27 PM, yh18190 yh18...@gmail.com wrote:

 Hi,
 I have an RDD of elements and want to create a new RDD by Zipping other RDD
 in order.
 result[RDD] with sequence of 10,20,30,40,50 ...elements.
 I am facing problems as index is not an RDD...its gives an error...Could
 anyone help me how we can zip it or map it inorder to obtain following
 result.(0,10),(1,20),(2,30),(3,40)
 I tried like this...but doesnt work...even zipWithIndex doesnt work becoz
 its scala method..not RDD method..

  val index= List.range(0, result.count(),1)
 result.zip(index)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Zip or map elements to create new RDD

2014-03-29 Thread yh18190
Thanks sonal.Is der anyother way like to map values with Increasing
indexes...so that i can map(t=(i,t)) where value if 'i' increases after
each map operation on element...

Please help me ..in this aspect 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Zip-or-map-elements-to-create-new-RDD-tp3467p3470.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How to index each map operation????

2014-03-29 Thread yh18190
Hi,

I want to perform map operation on an RDD of elements such that resulting
RDD is a key value pair(counter,value) 

For example var k:RDD[Int]=10,20,30,40,40,60...
k.map(t=(i,t))  where 'i' value should be like a counter whose value
increments after each mapoperation...
Pleas help me..
I tried to wirte like this but didnt work out..
var i=0;
k.map(t={
(i,t);i+=1;
}) 

please correct me...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Do all classes involving RDD operation need to be registered?

2014-03-29 Thread anny9699
Thanks so much Sonal! I am much clearer now.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439p3472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Strange behavior of RDD.cartesian

2014-03-29 Thread Andrew Ash
Is this spark 0.9.0? Try setting spark.shuffle.spill=false There was a hash
collision bug that's fixed in 0.9.1 that might cause you to have too few
results in that join.

Sent from my mobile phone
On Mar 28, 2014 8:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Weird, how exactly are you pulling out the sample? Do you have a small
 program that reproduces this?

 Matei

 On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 I forgot to mention that I don't really use all of my data. Instead I use
 a sample extracted with randomSample.


 On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.comwrote:

 Hi all,

 I notice that RDD.cartesian has a strange behavior with cached and
 uncached data. More precisely, I have a set of data that I load with
 objectFile

 *val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data)*

 Then I split it in two set depending on some criteria


 *val part1 = data.filter(_._2 matches view1)*
 *val part2 = data.filter(_._2 matches view2)*


 Finally, I compute the cartesian product of part1 and part2

 *val pair = part1.cartesian(part2)*


 If every thing goes well I should have

 *pair.count == part1.count * part2.count*

 But this is not the case if I don't cache part1 and part2.

 What I was missing ? Does caching data mandatory in Spark ?

 Cheers,

 Jaonary








Re: Announcing Spark SQL

2014-03-29 Thread Michael Armbrust
On Fri, Mar 28, 2014 at 9:53 PM, Rohit Rai ro...@tuplejump.com wrote:

 Upon discussion with couple of our clients, it seems the reason they would
 prefer using hive is that they have already invested a lot in it. Mostly in
 UDFs and HiveQL.
 1. Are there any plans to develop the SQL Parser to handdle more complex
 queries like HiveQL? Can we just plugin a custom parser instead of bringing
 in the whole hive deps?


We definitely want to have a more complete SQL parser without having to
pull in all of hive.  I think there are a couple of ways to do this.

1. Using a SQL-92 parser from something like optiq or writing our own
2. I haven't fully investigated the hive published artifacts, but if there
is some way to depend on only the parser that would be great.  If someone
has resources to investigate using the Hive parser without needing to
depend on all of hive this is a place where we would certainly welcome
contributions.  We could then consider making hiveql an option in a
standard SQLContext.


 2. Is there any way we can support UDFs in Catalyst without using Hive? It
 will bee fine if we don't support Hive UDFs as is and need minor porting
 effort.


All of the execution support for native scala udfs is already there, and in
fact when you use the DSL where
clausehttp://people.apache.org/~pwendell/catalyst-docs/api/sql/core/index.html#org.apache.spark.sql.SchemaRDDyou
are using this machinery.  For Spark 1.1 we will find a more general
way to expose this to users.


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-29 Thread Nicolas Bär
Hi

Is there any workaround to this problem?

I'm trying to implement a KafkaReceiver using the SimpleConsumer API [1] of
Kafka and handle the partition assignment manually. The easiest setup in
this case would be to bind the number of parallel jobs to the number of
partitions in Kafka. This is basically what Samza [2] does. I have a few
questions regarding this implementation:

The getReceiver method looks like a good starting point to implement the
manual partition assignment. Unfortunately it lacks documentation. As far
as I understood from the API, the getReceiver method is called once and
passes the received object (implementing the NetworkReceiver contract) to
the worker nodes. Therefore the logic to assign partitions to each receiver
has to be implemented within the receiver itself. I'm planning to implement
the following and have some questions in this regard:
1. within getReceiver: setup a zookeeper queue with the assignment of
partitions to parallel jobs and store the number of consumers needed
- Is the number of parallel jobs accessible through ssc?
2. within onStart: poll the zookeeper queue and receive the partition
number(s) to receive data from
3. within onStart: start a new thread for keepalive messages to zookeeper.
In case a node goes down and a new receiver is started up again, the new
receiver can lookup zookeper to find the consumer without recent keepalives
and take it's place. Due to SPARK-1340 this might not even be possible yet.
Is there any insight on why the receiver is not started again?

Do you think the use of zookeeper in this scenario is a good approach or is
there an easier way using an integrated spark functionality? Also, by using
the SimpleConsumer API one has to manage the offset per consumer. The high
level consumer API solves this problem with zookeeper. I'd take the same
approach, if there's no easier way to handle this in spark.


Best
Nicolas

[1]
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
[2]
http://samza.incubator.apache.org/learn/documentation/0.7.0/container/task-runner.html


On Fri, Mar 28, 2014 at 2:36 PM, Evgeniy Shishkin itparan...@gmail.comwrote:

 One more question,

 we are using memory_and_disk_ser_2
 and i worried when those rdds on disk will be removed
 http://i.imgur.com/dbq5T6i.png

 unpersist is set to true, and rdds get purged from memory, but disk space
 just keep growing.

 On 28 Mar 2014, at 01:32, Tathagata Das tathagata.das1...@gmail.com
 wrote:

  Yes, no one has reported this issue before. I just opened a JIRA on what
 I think is the main problem here
  https://spark-project.atlassian.net/browse/SPARK-1340
  Some of the receivers dont get restarted.
  I have a bunch refactoring in the NetworkReceiver ready to be posted as
 a PR that should fix this.
 
  Regarding the second problem, I have been thinking of adding flow
 control (i.e. limiting the rate of receiving) for a while, just havent
 gotten around to it.
  I added another JIRA for that for tracking this issue.
  https://spark-project.atlassian.net/browse/SPARK-1341
 
 
  TD
 
 
  On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin itparan...@gmail.com
 wrote:
 
  On 28 Mar 2014, at 01:11, Scott Clasen scott.cla...@gmail.com wrote:
 
   Evgeniy Shishkin wrote
   So, at the bottom -- kafka input stream just does not work.
  
  
   That was the conclusion I was coming to as well.  Are there open
 tickets
   around fixing this up?
  
 
  I am not aware of such. Actually nobody complained on spark+kafka before.
  So i thought it just works, and then we tried to build something on it
 and almost failed.
 
  I think that it is possible to steal/replicate how twitter storm works
 with kafka.
  They do manual partition assignment, at least this would help to balance
 load.
 
  There is another issue.
  ssc batch creates new rdds every batch duration, always, even it
 previous computation did not finish.
 
  But with kafka, we can consume more rdds later, after we finish previous
 rdds.
  That way it would be much much simpler to not get OOM'ed when starting
 from beginning,
  because we can consume many data from kafka during batch duration and
 then get oom.
 
  But we just can not start slow, can not limit how many to consume during
 batch.
 
 
  
   --
   View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 




Re: pySpark memory usage

2014-03-29 Thread Jim Blomo
I've only tried 0.9, in which I ran into the `stdin writer to Python
finished early` so frequently I wasn't able to load even a 1GB file.
Let me know if I can provide any other info!

On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll 
 try to look into these, seems like a serious error.

 Matei

 On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Matei.  I am running Spark 1.0.0-SNAPSHOT built for Hadoop
 1.0.4 from GitHub on 2014-03-18.

 I tried batchSizes of 512, 10, and 1 and each got me further but none
 have succeeded.

 I can get this to work -- with manual interventions -- if I omit
 `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
 of the 175 executors hung, and I had to kill the python process to get
 things going again.  The only indication of this in the logs was `INFO
 python.PythonRDD: stdin writer to Python finished early`.

 With batchSize=1 and persist, a new memory error came up in several
 tasks, before the app was failed:

 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
 thread Thread[stdin writer for python,5,main]
 java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

 There are other exceptions, but I think they all stem from the above,
 eg. org.apache.spark.SparkException: Error sending message to
 BlockManagerMaster

 Let me know if there are other settings I should try, or if I should
 try a newer snapshot.

 Thanks again!


 On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim,

 In Spark 0.9 we added a batchSize parameter to PySpark that makes it 
 group multiple objects together before passing them between Java and 
 Python, but this may be too high by default. Try passing batchSize=10 to 
 your SparkContext constructor to lower it (the default is 1024). Or even 
 batchSize=1 to match earlier versions.

 Matei

 On Mar 21, 2014, at 6:18 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Hi all, I'm wondering if there's any settings I can use to reduce the
 memory needed by the PythonRDD when computing simple stats.  I am
 getting OutOfMemoryError exceptions while calculating count() on big,
 but not absurd, records.  It seems like PythonRDD is trying to keep
 too many of these records in memory, when all that is needed is to
 stream through them and count.  Any tips for getting through this
 workload?


 Code:
 session = sc.textFile('s3://...json.gz') # ~54GB of compressed data

 # the biggest individual text line is ~3MB
 parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s):
 (loads(y), loads(s)))
 parsed.persist(StorageLevel.MEMORY_AND_DISK)

 parsed.count()
 # will never finish: executor.Executor: Uncaught exception will FAIL
 all executors

 Incidentally the whole app appears to be killed, but this error is not
 propagated to the shell.

 Cluster:
 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)

 Exception:
 java.lang.OutOfMemoryError: Java heap space
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)




Limiting number of reducers performance implications

2014-03-29 Thread Matthew Cheah
Hi everyone,

I'm using Spark on machines where I can't change the maximum number of open
files. As a result, I'm limiting the number of reducers to 500. I'm also
only using a single machine that has 32 cores and emulating a cluster by
running 4 worker daemons with 8 cores (maximum) each.

What I'm noticing is that as I allocate more cores to my Spark job, my job
is not being completed much more quickly, and, the CPUs aren't being
heavily utilized. When I attempt to use all 32 cores, monitoring via mpstat
is showing a decent chunk of CPU idleness (70%-80% idle).

My job consists primarily of reduceByKey and groupByKey operations, with
flatMaps, maps, and filters in between.

I was wondering: *what exactly does the number of reducers do?* Clearly, if
there are 32 cores, setting the number of reducers to 500 should be a
workload that keeps all 32 cores busy. Or am I misunderstanding the concept
of what a reducer is?

Thanks,

-Matt Ceah


Cross validation is missing in machine learning examples

2014-03-29 Thread Aureliano Buendia
Hi,

I notices spark machine learning examples use training data to validate
regression models, For instance, in linear
regressionhttp://spark.apache.org/docs/0.9.0/mllib-guide.htmlexample:

// Evaluate model on training examples and compute training errorval
valuesAndPreds = parsedData.map { point =
  val prediction = model.predict(point.features)
  (point.label, prediction)}
...


 Here training data was used to validated a model which was created from
the very same training data. This is just a bias estimation, and cross
validation http://en.wikipedia.org/wiki/Cross-validation_%28statistics%29is
missing here. In order to cross validate, we need to partition the
data
into in-sample for training, and out-of-sample for validation.

Please correct me if this does not apply to ML algorithms implemented in
spark.


Re: pySpark memory usage

2014-03-29 Thread Jim Blomo
I think the problem I ran into in 0.9 is covered in
https://issues.apache.org/jira/browse/SPARK-1323

When I kill the python process, the stacktrace I gets indicates that
this happens at initialization.  It looks like the initial write to
the Python process does not go through, and then the iterator hangs
waiting for output.  I haven't had luck turning on debugging for the
executor process.  Still trying to learn the lgo4j properties that
need to be set.

No luck yet on tracking down the memory leak.

14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:52)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)


On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo jim.bl...@gmail.com wrote:
 I've only tried 0.9, in which I ran into the `stdin writer to Python
 finished early` so frequently I wasn't able to load even a 1GB file.
 Let me know if I can provide any other info!

 On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 I see, did this also fail with previous versions of Spark (0.9 or 0.8)? 
 We'll try to look into these, seems like a serious error.

 Matei

 On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Matei.  I am running Spark 1.0.0-SNAPSHOT built for Hadoop
 1.0.4 from GitHub on 2014-03-18.

 I tried batchSizes of 512, 10, and 1 and each got me further but none
 have succeeded.

 I can get this to work -- with manual interventions -- if I omit
 `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
 of the 175 executors hung, and I had to kill the python process to get
 things going again.  The only indication of this in the logs was `INFO
 python.PythonRDD: stdin writer to Python finished early`.

 With batchSize=1 and persist, a new memory error came up in several
 tasks, before the app was failed:

 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
 thread Thread[stdin writer for python,5,main]
 java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

 There are other exceptions, but I think they all stem from the above,
 eg. org.apache.spark.SparkException: Error sending message to
 BlockManagerMaster

 Let me know if there are other settings I should try, or if I should
 try a newer snapshot.

 Thanks again!


 On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim,

 In Spark 0.9 we added a batchSize parameter to PySpark that makes it 
 group multiple objects together before passing them between