Re: Size of RDD larger than Size of data on disk

2014-02-25 Thread Matei Zaharia
The problem is that Java objects can take more space than the underlying data, 
but there are options in Spark to store data in serialized form to get around 
this. Take a look at https://spark.incubator.apache.org/docs/latest/tuning.html.

Matei

On Feb 25, 2014, at 12:01 PM, Suraj Satishkumar Sheth suraj...@adobe.com 
wrote:

 Hi Mayur,
 Thanks for replying. Is it usually double the size of data on disk?
 I have observed this many times. Storage section of Spark is telling me that 
 100% of RDD is cached using 97 GB of RAM while the data in HDFS is only 47 GB.
  
 Thanks and Regards,
 Suraj Sheth
  
 From: Mayur Rustagi [mailto:mayur.rust...@gmail.com] 
 Sent: Tuesday, February 25, 2014 11:19 PM
 To: user@spark.apache.org
 Cc: u...@spark.incubator.apache.org
 Subject: Re: Size of RDD larger than Size of data on disk
  
 Spark may take more RAM than reqiured by RDD, can you look at storage section 
 of Spark  see how much space RDD is taking in memory. It may still take more 
 storage than disk as Java objects have some overhead. 
 Consider enabling compression in RDD. 
 
 Mayur Rustagi
 Ph: +919632149971
 http://www.sigmoidanalytics.com
 https://twitter.com/mayur_rustagi
  
  
 
 On Tue, Feb 25, 2014 at 6:47 AM, Suraj Satishkumar Sheth suraj...@adobe.com 
 wrote:
 Hi All,
 I have a folder in HDFS which has files with size of 47GB. I am loading this 
 in Spark as RDD[String] and caching it. The total amount of RAM that Spark 
 uses to cache it is around 97GB. I want to know why Spark is taking up so 
 much of Space for the RDD? Can we reduce the RDD size in Spark and make it 
 similar to it’s size on disk?
  
 Thanks and Regards,
 Suraj Sheth



Re: Implementing a custom Spark shell

2014-02-26 Thread Matei Zaharia
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a 
script containing commands before opening the prompt. This is also a feature of 
the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the 
content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen sampo.niska...@wellmo.com wrote:

 Hi,
 
 I'd like to create a custom version of the Spark shell, which has 
 automatically defined some other variables / RDDs (in addition to 'sc') 
 specific to our application.  Is this possible?
 
 I took a look at the code that the spark-shell invokes, and it seems quite 
 complex.  Can this be reused from my code?
 
 
 I'm implementing a standalone application that uses the Spark libraries 
 (managed by SBT).  Ideally, I'd like to be able to launch the shell from that 
 application, instead of using the default Spark distribution.  Alternatively, 
 can some utility code be injected within the standard spark-shell?
 
 
 Thanks.
 
 Sampo Niskanen
 Lead developer / Wellmo
 
 



Re: Building spark with native library support

2014-03-06 Thread Matei Zaharia
Is it an error, or just a warning? In any case, you need to get those libraries 
from a build of Hadoop for your platform. Then add them to the 
SPARK_LIBRARY_PATH environment variable in conf/spark-env.sh, or to your 
-Djava.library.path if launching an application separately.

These libraries just speed up some compression codecs BTW, so it should be fine 
to run without them too.

Matei

On Mar 6, 2014, at 9:04 AM, Alan Burlison alan.burli...@oracle.com wrote:

 Hi,
 
 I've successfully built 0.9.0-incubating on Solaris using sbt, following the 
 instructions at http://spark.incubator.apache.org/docs/latest/ and it seems 
 to work OK. However, when I start it up I get an error about missing Hadoop 
 native libraries. I can't find any mention of how to build the native 
 components in the instructions, how is that done?
 
 Thanks,
 
 -- 
 Alan Burlison
 --



Re: major Spark performance problem

2014-03-09 Thread Matei Zaharia
Hi Dana,

It’s hard to tell exactly what is consuming time, but I’d suggest starting by 
profiling the single application first. Three things to look at there:

1) How many stages and how many tasks per stage is Spark launching (in the 
application web UI at http://driver:4040)? If you have hundreds of tasks for 
this small a file, just the task launching time might be a problem. You can use 
RDD.coalesce() to have fewer data partitions.

2) If you run a Java profiler (e.g. YourKit or hprof) on the workers while the 
application is executing, where is time being spent? Maybe some of your code is 
more expensive than it seems. One other thing you might find is that some code 
you use requires synchronization and is therefore not scaling properly to 
multiple cores (e.g. Java’s Math.random() actually does that).

3) Are there any RDDs that are used over and over but not cached? In that case 
they’ll be recomputed on each use.

Once you look into these it might be easier to improve the multiple-job case. 
In that case as others have pointed out, running the jobs in the same 
SparkContext and using the fair scheduler 
(http://spark.apache.org/docs/latest/job-scheduling.html) should work.

Matei

On Mar 9, 2014, at 5:56 AM, Livni, Dana dana.li...@intel.com wrote:

 YARN also have this scheduling option.
 The problem is all of our applications have the same flow where the first  
 stage is the heaviest and the rest are very small.
 The problem is when some request (application) start to run on the same time, 
 the first stage of all is schedule in parallel, and for some reason they 
 delay each other,
 And a stage that alone will take around 13s can reach up to 2m when running 
 in parallel with other identic stages  (around 15 stages).
 
 
 
 -Original Message-
 From: elyast [mailto:lukasz.jastrzeb...@gmail.com] 
 Sent: Friday, March 07, 2014 20:01
 To: u...@spark.incubator.apache.org
 Subject: Re: major Spark performance problem
 
 Hi,
 
 There is also an option to run spark applications on top of mesos in fine 
 grained mode, then it is possible for fair scheduling (applications will run 
 in parallel and mesos is responsible for scheduling all tasks) so in a sense 
 all applications will progress in parallel, obviously it total in may not be 
 faster however the benefit is the fair scheduling (small jobs will not be 
 stuck by the big ones).
 
 Best regards
 Lukasz Jastrzebski
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/major-Spark-performance-problem-tp2364p2403.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 Intel Electronics Ltd.
 
 This e-mail and any attachments may contain confidential material for
 the sole use of the intended recipient(s). Any review or distribution
 by others is strictly prohibited. If you are not the intended
 recipient, please contact the sender and delete all copies.
 



Re: NO SUCH METHOD EXCEPTION

2014-03-11 Thread Matei Zaharia
Since it’s from Scala, it might mean you’re running with a different version of 
Scala than you compiled Spark with. Spark 0.8 and earlier use Scala 2.9, while 
Spark 0.9 uses Scala 2.10.

Matei

On Mar 11, 2014, at 8:19 AM, Jeyaraj, Arockia R (Arockia) 
arockia.r.jeya...@verizon.com wrote:

 Hi,
  
 Can anyone help me to resolve this issue? Why am I getting NoSuchMethod 
 exception?
  
 14/03/11 09:56:11 ERROR executor.Executor: Exception in task ID 0
 java.lang.NoSuchMethodError: 
 scala.Predef$.augmentString(Ljava/lang/String;)Lsca
 la/collection/immutable/StringOps;
 at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s
 cala:75)
 at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58
 )
 at kafka.utils.ZKConfig.init(ZkUtils.scala:837)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77)
 at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr
 eam.scala:98)
 at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput
 DStream.scala:126)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:173)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:169)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc
 V$sp(Executor.scala:213)
 at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca
 la:49)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
 utor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
 .java:908)
 at java.lang.Thread.run(Thread.java:619)
 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Loss was due to 
 java.lang.NoSuc
 hMethodError
 java.lang.NoSuchMethodError: 
 scala.Predef$.augmentString(Ljava/lang/String;)Lsca
 la/collection/immutable/StringOps;
 at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s
 cala:75)
 at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58
 )
 at kafka.utils.ZKConfig.init(ZkUtils.scala:837)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77)
 at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr
 eam.scala:98)
 at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput
 DStream.scala:126)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:173)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:169)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc
 V$sp(Executor.scala:213)
 at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca
 la:49)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
 utor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
 .java:908)
 at java.lang.Thread.run(Thread.java:619)
 14/03/11 09:56:11 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; 
 abo
 rting job
 14/03/11 09:56:11 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from 
 pool
 
 14/03/11 09:56:11 INFO scheduler.DAGScheduler: Failed to run runJob at 
 NetworkIn
 putTracker.scala:182
 [error] (Thread-34) org.apache.spark.SparkException: Job aborted: Task 0.0:0 
 fai
 
  
  
 Thanks
 Arockia Raja



Re: Powered By Spark Page -- Companies Organizations

2014-03-11 Thread Matei Zaharia
Thanks, added you.

On Mar 11, 2014, at 2:47 AM, Christoph Böhm listenbru...@gmx.net wrote:

 Dear Spark team,
 
 thanks for the great work and congrats on becoming an Apache top-level 
 project!
 
 You could add us to your Powered-by-page, because we are using Spark (and 
 Shark) to perform interactive exploration of large datasets.
 Find us on: www.bakdata.com
 
 Best,
 Christoph
 
 -
 Christoph Böhm
 
 bakdata | bespoke data engineering 
 www.bakdata.com



Re: RDD.saveAs...

2014-03-11 Thread Matei Zaharia
I agree that we can’t keep adding these to the core API, partly because it will 
get unwieldy to maintain and partly just because each storage system will bring 
in lots of dependencies. We can simply have helper classes in different modules 
for each storage system. There’s some discussion on this at 
https://spark-project.atlassian.net/browse/SPARK-1127.

Matei

On Mar 11, 2014, at 9:06 AM, Koert Kuipers ko...@tresata.com wrote:

 I find the current design to write RDDs to disk (or a database, etc) kind of 
 ugly. It will lead to a proliferation of saveAs methods. A better abstraction 
 would be nice (perhaps a Sink trait to write to)
 



Re: possible bug in Spark's ALS implementation...

2014-03-16 Thread Matei Zaharia
On Mar 14, 2014, at 5:52 PM, Michael Allman m...@allman.ms wrote:

 I also found that the product and user RDDs were being rebuilt many times
 over in my tests, even for tiny data sets. By persisting the RDD returned
 from updateFeatures() I was able to avoid a raft of duplicate computations.
 Is there a reason not to do this?

This sounds like a good thing to add, though I’d like to understand why these 
are being recomputed (it seemed that the code would only use each one once). Do 
you have any sense why that is?

Matei

Re: How to kill a spark app ?

2014-03-16 Thread Matei Zaharia
If it’s a driver on the cluster, please open a JIRA issue about this — this 
kill command is indeed intended to work.

Matei

On Mar 16, 2014, at 2:35 PM, Mayur Rustagi mayur.rust...@gmail.com wrote:

 Are you embedding your driver inside the cluster?
 If not then that command will not kill the driver. You can simply kill the 
 application by killing the scala application. 
 So if its spark shell, simply by killing the shell the application will 
 disconnect from the cluster. 
 
 If the driver is embedded in the cluster then the above command will be 
 required. 
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Sun, Mar 16, 2014 at 5:06 PM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Thanks Mayur...
 
 I need both...but to start with even application killer will help a lot...
 
 Somehow that command did not work for meI will try it again from the 
 spark main folder..
 
 
 On Sun, Mar 16, 2014 at 1:43 PM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 This is meant to kill the whole driver hosted inside the Master (new feature 
 as of 0.9.0). 
 I assume you are trying to kill a job/task/stage inside the Spark rather than 
 the whole application. 
 Regards
 Mayur
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Sun, Mar 16, 2014 at 4:36 PM, Debasish Das debasish.da...@gmail.com 
 wrote:
 From 
 http://spark.incubator.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster
 
 
 
 ./bin/spark-class org.apache.spark.deploy.Client kill driverId
 
 does not work / has bugs ?
 
 
 On Sun, Mar 16, 2014 at 1:17 PM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 Thr is a no good way to kill jobs in Spark yet. The closest is cancelAllJobs 
  cancelJobGroup in spark context. I have had bugs using both. I am trying to 
 test them out, typically you would start a different thread  call these 
 functions on it when you wish to cancel a job.
 Regards
 Mayur
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Sun, Mar 16, 2014 at 2:59 PM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Are these the right options:
 
 1. If there is a spark script, just do a ctrl-c from spark-shell and the job 
 will be killed property.
 
 2. For spark application also ctrl c will kill the job property on the 
 cluster:
 
 Somehow the ctrl-c option did not work for us...
 
 Similar option works fine for scalding for example but we see lot of dead 
 nodes if too many jobs are killed abruptly.
 
 3. Use the Client script...
 
 /bin/spark-class org.apache.spark.deploy.Client kill spark://myspark.com:7077 
 app-20140316142129-
 Runner java
 Classpath 
 :/home/debasish/sag_spark/conf:/home/debasish/sag_spark/assembly/target/scala-2.10/spark-assembly-1.0.0-incubating-SNAPSHOT-hadoop2.0.0-mr1-cdh4.5.0.jar
 Java opts  -Djava.library.path= -Xms512m -Xmx512m
 Options -Dspark.cores.max=16
 Sending kill command to spark://myspark.com:7077
 Driver app-20140316142129- has already finished or does not exist
 
 This option also did not kill the job. I can still see the job running on 
 spark webui...
 
 Thanks.
 Deb
 
 
 
 
 



Re: [Powered by] Yandex Islands powered by Spark

2014-03-16 Thread Matei Zaharia
Thanks, I’ve added you: 
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark. Let me know 
if you want to change any wording.

Matei

On Mar 16, 2014, at 6:48 AM, Egor Pahomov pahomov.e...@gmail.com wrote:

 Hi, page https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark 
 says I need write here, if want my project to be added there.
 
 In Yandex (www.yandex.com) now we using spark for project Yandex Islands 
 (http://www.searchenginejournal.com/yandex-islands-markup-issues-implementation/71891/).
  We process islands which come from search robot with spark.
 
 -- 
 Sincerely yours
 Egor Pakhomov
 Scala Developer, Yandex



Re: example of non-line oriented input data?

2014-03-17 Thread Matei Zaharia
Hi Diana,

Non-text input formats are only supported in Java and Scala right now, where 
you can use sparkContext.hadoopFile or .hadoopDataset to load data with any 
InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only 
have textFile, which gives you one record per line. For JSON, you’d have to fit 
the whole JSON object on one line as you said. Hopefully we’ll also have some 
other forms of input soon.

If your input is a collection of separate files (say many .xml files), you can 
also use mapPartitions on it to group together the lines because each input 
file will end up being a single dataset partition (or map task). This will let 
you concatenate the lines in each file and parse them as one XML object.

Matei

On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote:

 Thanks, Krakna, very helpful.  The way I read the code, it looks like you are 
 assuming that each line in foo.log contains a complete json object?  (That 
 is, that the data doesn't contain any records that are split into multiple 
 lines.)  If so, is that because you know that to be true of your data?  Or 
 did you do as Nicholas suggests and have some preprocessing on the text input 
 to flatten the data in that way?
 
 Thanks,
 Diana
 
 
 On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote:
 Katrina, 
 
 Not sure if this is what you had in mind, but here's some simple pyspark code 
 that I recently wrote to deal with JSON files.
 
 from pyspark import SparkContext, SparkConf
 
 
 from operator import add
 import json
 
 
 import random
 import numpy as np
 
 
 
 def concatenate_paragraphs(sentence_array):
 
 
   return ' '.join(sentence_array).split(' ')
 
 
 
 logFile = 'foo.json'
 conf = SparkConf()
 
 
 conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory,
  1g)
 
 
 
 
 sc = SparkContext(conf=conf)
 
 
 logData = sc.textFile(logFile).cache()
 
 
 num_lines = logData.count()
 print 'Number of lines: %d' % num_lines
 
 
 
 
 # JSON object has the structure: {key: {'paragraphs': [sentence1, 
 sentence2, ...]}}
 tm = logData.map(lambda s: (json.loads(s)['key'], 
 len(concatenate_paragraphs(json.loads(s)['paragraphs']
 
 
 
 
 tm = tm.reduceByKey(lambda _, x: _ + x)
 
 
 
 
 op = tm.collect()
 for key, num_words in op:
 
 
   print 'state: %s, num_words: %d' % (state, num_words)
 
 
 
 
 
 
 
 
 On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] 
 [hidden email] wrote:
 I don't actually have any data.  I'm writing a course that teaches students 
 how to do this sort of thing and am interested in looking at a variety of 
 real life examples of people doing things like that.  I'd love to see some 
 working code implementing the obvious work-around you mention...do you have 
 any to share?  It's an approach that makes a lot of sense, and as I said, I'd 
 love to not have to re-invent the wheel if someone else has already written 
 that code.  Thanks!
 
 Diana
 
 
 On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas [hidden email] wrote:
 There was a previous discussion about this here:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
 
 How big are the XML or JSON files you're looking to deal with? 
 
 It may not be practical to deserialize the entire document at once. In that 
 case an obvious work-around would be to have some kind of pre-processing step 
 that separates XML nodes/JSON objects with newlines so that you can analyze 
 the data with Spark in a line-oriented format. Your preprocessor wouldn't 
 have to parse/deserialize the massive document; it would just have to track 
 open/closed tags/braces to know when to insert a newline.
 
 Then you'd just open the line-delimited result and deserialize the individual 
 objects/nodes with map().
 
 Nick
 
 
 On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll [hidden email] wrote:
 Has anyone got a working example of a Spark application that analyzes data in 
 a non-line-oriented format, such as XML or JSON?  I'd like to do this without 
 re-inventing the wheel...anyone care to share?  Thanks!
 
 Diana
 
 
 
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
 To start a new topic under Apache Spark User List, email [hidden email] 
 To unsubscribe from Apache Spark User List, click here.
 NAML
 
 
 View this message in context: Re: example of non-line oriented input data?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: example of non-line oriented input data?

2014-03-17 Thread Matei Zaharia
Here’s an example of getting together all lines in a file as one string:

$ cat dir/a.txt 
Hello
world!

$ cat dir/b.txt 
What's
up??

$ bin/pyspark
 files = sc.textFile(“dir”)

 files.collect()
[u'Hello', u'world!', uWhat's, u'up??’]   # one element per line, not what we 
want

 files.glom().collect()
[[u'Hello', u'world!'], [uWhat's, u'up??’]]   # one element per file, which 
is an array of lines

 files.glom().map(lambda a: \n.join(a)).collect()
[u'Hello\nworld!', uWhat's\nup??”]# join back each file into a single 
string

The glom() method groups all the elements of each partition of an RDD into an 
array, giving you an RDD of arrays of objects. If your input is small files, 
you always have one partition per file.

There’s also mapPartitions, which gives you an iterator for each partition 
instead of an array. You can then return an iterator or list of objects to 
produce from that.

Matei


On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote:

 Thanks Matei.  That makes sense.  I have here a dataset of many many smallish 
 XML files, so using mapPartitions that way would make sense.  I'd love to see 
 a code example though ...It's not as obvious to me how to do that as I 
 probably should be. 
 
 Thanks,
 Diana
 
 
 On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hi Diana,
 
 Non-text input formats are only supported in Java and Scala right now, where 
 you can use sparkContext.hadoopFile or .hadoopDataset to load data with any 
 InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only 
 have textFile, which gives you one record per line. For JSON, you’d have to 
 fit the whole JSON object on one line as you said. Hopefully we’ll also have 
 some other forms of input soon.
 
 If your input is a collection of separate files (say many .xml files), you 
 can also use mapPartitions on it to group together the lines because each 
 input file will end up being a single dataset partition (or map task). This 
 will let you concatenate the lines in each file and parse them as one XML 
 object.
 
 Matei
 
 On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote:
 
 Thanks, Krakna, very helpful.  The way I read the code, it looks like you 
 are assuming that each line in foo.log contains a complete json object?  
 (That is, that the data doesn't contain any records that are split into 
 multiple lines.)  If so, is that because you know that to be true of your 
 data?  Or did you do as Nicholas suggests and have some preprocessing on the 
 text input to flatten the data in that way?
 
 Thanks,
 Diana
 
 
 On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote:
 Katrina, 
 
 Not sure if this is what you had in mind, but here's some simple pyspark 
 code that I recently wrote to deal with JSON files.
 
 from pyspark import SparkContext, SparkConf
 
 
 
 from operator import add
 import json
 
 
 
 import random
 import numpy as np
 
 
 
 
 def concatenate_paragraphs(sentence_array):
 
 
  
 return ' '.join(sentence_array).split(' ')
 
 
 
 
 logFile = 'foo.json'
 conf = SparkConf()
 
 
 
 conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory,
  1g)
 
 
 
 
 
 
 
 sc = SparkContext(conf=conf)
 
 
 
 logData = sc.textFile(logFile).cache()
 
 
 
 num_lines = logData.count()
 print 'Number of lines: %d' % num_lines
 
 
 
 
 
 
 
 # JSON object has the structure: {key: {'paragraphs': [sentence1, 
 sentence2, ...]}}
 tm = logData.map(lambda s: (json.loads(s)['key'], 
 len(concatenate_paragraphs(json.loads(s)['paragraphs']
 
 
 
 
 
 
 
 tm = tm.reduceByKey(lambda _, x: _ + x)
 
 
 
 
 
 
 
 op = tm.collect()
 for key, num_words in op:
 
 
 
  print 'state: %s, num_words: %d' % (state, num_words)
 
 
 
 
 
 
 
 
 
 
 
 On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] 
 [hidden email] wrote:
 I don't actually have any data.  I'm writing a course that teaches students 
 how to do this sort of thing and am interested in looking at a variety of 
 real life examples of people doing things like that.  I'd love to see some 
 working code implementing the obvious work-around you mention...do you 
 have any to share?  It's an approach that makes a lot of sense, and as I 
 said, I'd love to not have to re-invent the wheel if someone else has 
 already written that code.  Thanks!
 
 Diana
 
 
 On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas [hidden email] wrote:
 There was a previous discussion about this here:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
 
 How big are the XML or JSON files you're looking to deal with? 
 
 It may not be practical to deserialize the entire document at once. In that 
 case an obvious work-around would be to have some kind of pre-processing 
 step that separates XML nodes/JSON objects with newlines so that you can 
 analyze the data with Spark in a line-oriented

Re: is collect exactly-once?

2014-03-17 Thread Matei Zaharia
Yup, it only returns each value once.

Matei

On Mar 17, 2014, at 1:14 PM, Adrian Mocanu amoc...@verticalscope.com wrote:

 Hi
 Quick question here,
 I know that .foreach is not idempotent.  I am wondering if collect() is 
 idempotent? Meaning that once I’ve collect()-ed if spark node crashes I can’t 
 get the same values from the stream ever again.
  
 Thanks
 -Adrian



Re: example of non-line oriented input data?

2014-03-17 Thread Matei Zaharia
Oh, I see, the problem is that the function you pass to mapPartitions must 
itself return an iterator or a collection. This is used so that you can return 
multiple output records for each input record. You can implement most of the 
existing map-like operations in Spark, such as map, filter, flatMap, etc, with 
mapPartitions, as well as new ones that might do a sliding window over each 
partition for example, or accumulate data across elements (e.g. to compute a 
sum).

For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:

 data.mapPartitions(lambda x: x).collect()
[1, 2, 3, 4]   # Just return the same iterator, doing nothing

 data.mapPartitions(lambda x: [list(x)]).collect()
[[1, 2], [3, 4]]   # Group together the elements of each partition in a single 
list (like glom)

 data.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7]   # Sum each partition separately

However something like data.mapPartitions(lambda x: sum(x)).collect() will 
*not* work because sum returns a number, not an iterator. That’s why I put 
sum(x) inside a list above.

In practice mapPartitions is most useful if you want to share some data or work 
across the elements. For example maybe you want to load a lookup table once 
from an external file and then check each element in it, or sum up a bunch of 
elements without allocating a lot of vector objects.

Matei


On Mar 17, 2014, at 11:25 AM, Diana Carroll dcarr...@cloudera.com wrote:

 There’s also mapPartitions, which gives you an iterator for each partition 
 instead of an array. You can then return an iterator or list of objects to 
 produce from that.
 
 I confess, I was hoping for an example of just that, because i've not yet 
 been able to figure out how to use mapPartitions.  No doubt this is because 
 i'm a rank newcomer to Python, and haven't fully wrapped my head around 
 iterators.  All I get so far in my attempts to use mapPartitions is the 
 darned suchnsuch is not an iterator error.   
 
 def myfunction(iterator): return [1,2,3]
 mydata.mapPartitions(lambda x: myfunction(x)).take(2)
 
 
 
 
 
 On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Here’s an example of getting together all lines in a file as one string:
 
 $ cat dir/a.txt
 Hello
 world!
 
 $ cat dir/b.txt
 What's
 up??
 
 $ bin/pyspark
  files = sc.textFile(“dir”)
 
  files.collect()
 [u'Hello', u'world!', uWhat's, u'up??’]   # one element per line, not what 
 we want
 
  files.glom().collect()
 [[u'Hello', u'world!'], [uWhat's, u'up??’]]   # one element per file, which 
 is an array of lines
 
  files.glom().map(lambda a: \n.join(a)).collect()
 [u'Hello\nworld!', uWhat's\nup??”]# join back each file into a single 
 string
 
 The glom() method groups all the elements of each partition of an RDD into an 
 array, giving you an RDD of arrays of objects. If your input is small files, 
 you always have one partition per file.
 
 There’s also mapPartitions, which gives you an iterator for each partition 
 instead of an array. You can then return an iterator or list of objects to 
 produce from that.
 
 Matei
 
 
 On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote:
 
  Thanks Matei.  That makes sense.  I have here a dataset of many many 
  smallish XML files, so using mapPartitions that way would make sense.  I'd 
  love to see a code example though ...It's not as obvious to me how to do 
  that as I probably should be.
 
  Thanks,
  Diana
 
 
  On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com 
  wrote:
  Hi Diana,
 
  Non-text input formats are only supported in Java and Scala right now, 
  where you can use sparkContext.hadoopFile or .hadoopDataset to load data 
  with any InputFormat that Hadoop MapReduce supports. In Python, you 
  unfortunately only have textFile, which gives you one record per line. For 
  JSON, you’d have to fit the whole JSON object on one line as you said. 
  Hopefully we’ll also have some other forms of input soon.
 
  If your input is a collection of separate files (say many .xml files), you 
  can also use mapPartitions on it to group together the lines because each 
  input file will end up being a single dataset partition (or map task). This 
  will let you concatenate the lines in each file and parse them as one XML 
  object.
 
  Matei
 
  On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote:
 
  Thanks, Krakna, very helpful.  The way I read the code, it looks like you 
  are assuming that each line in foo.log contains a complete json object?  
  (That is, that the data doesn't contain any records that are split into 
  multiple lines.)  If so, is that because you know that to be true of your 
  data?  Or did you do as Nicholas suggests and have some preprocessing on 
  the text input to flatten the data in that way?
 
  Thanks,
  Diana
 
 
  On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote:
  Katrina,
 
  Not sure if this is what you had

Re: links for the old versions are broken

2014-03-17 Thread Matei Zaharia
Thanks for reporting this, looking into it.

On Mar 17, 2014, at 2:44 PM, Walrus theCat walrusthe...@gmail.com wrote:

 ping
 
 
 On Thu, Mar 13, 2014 at 11:05 AM, Aaron Davidson ilike...@gmail.com wrote:
 Looks like everything from 0.8.0 and before errors similarly (though Spark 
 0.3 for Scala 2.9 has a malformed link as well).
 
 
 On Thu, Mar 13, 2014 at 10:52 AM, Walrus theCat walrusthe...@gmail.com 
 wrote:
 Sup,
 
 Where can I get Spark 0.7.3?  It's 404 here:
 
 http://spark.apache.org/downloads.html
 
 Thanks
 
 



Re: Incrementally add/remove vertices in GraphX

2014-03-18 Thread Matei Zaharia
I just meant that you call union() before creating the RDDs that you pass to 
new Graph(). If you call it after it will produce other RDDs.

The Graph() constructor actually shuffles and “indexes” the data to make graph 
operations efficient, so it’s not too easy to add elements after. You could 
access graph.vertices and graph.edges to build new RDDs, and then call Graph() 
again to make a new graph. I’ve CCed Joey and Ankur to see if they have further 
ideas on how to optimize this. It would be cool to support more efficient union 
and subtracting of graphs once they’ve been partitioned by GraphX.

Matei

On Mar 14, 2014, at 8:32 AM, alelulli alessandro.lu...@gmail.com wrote:

 Hi Matei,
 
 Could you please clarify why i must call union before creating the graph?
 
 What's the behavior if i call union / subtract after the creation? 
 Is the added /removed vertexes been processed?
 
 For example if i'm implementing an iterative algorithm and at the 5th step i
 need to add some vertex / edge, can i call union / subtract on the
 VertexRDD, EdgeRDD and Triplets?
 
 Thanks
 Alessandro
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p2695.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Pyspark worker memory

2014-03-19 Thread Matei Zaharia
Try checking spark-env.sh on the workers as well. Maybe code there is somehow 
overriding the spark.executor.memory setting.

Matei

On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Hello, I'm using the Github snapshot of PySpark and having trouble setting 
 the worker memory correctly. I've set spark.executor.memory to 5g, but 
 somewhere along the way Xmx is getting capped to 512M. This was not occurring 
 with the same setup and 0.9.0. How many places do I need to configure the 
 memory? Thank you!
 



Re: What's the lifecycle of an rdd? Can I control it?

2014-03-19 Thread Matei Zaharia
Yes, Spark automatically removes old RDDs from the cache when you make new 
ones. Unpersist forces it to remove them right away. In both cases though, note 
that Java doesn’t garbage-collect the objects released until later.

Matei

On Mar 19, 2014, at 7:22 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:

 Related question: 
 
 If I keep creating new RDDs and cache()-ing them, does Spark automatically 
 unpersist the least recently used RDD when it runs out of memory? Or is an 
 explicit unpersist the only way to get rid of an RDD (barring the PR 
 Tathagata mentioned)?
 
 Also, does unpersist()-ing an RDD immediately free up space, or just allow 
 that space to be reclaimed when needed?
 
 
 On Wed, Mar 19, 2014 at 7:01 PM, Tathagata Das tathagata.das1...@gmail.com 
 wrote:
 Just a head's up, there is an active pull requeust that will automatically 
 unpersist RDDs that are not in reference/scope from the application any more. 
 
 TD
 
 
 On Wed, Mar 19, 2014 at 6:58 PM, hequn cheng chenghe...@gmail.com wrote:
 persist and unpersist.
 unpersist:Mark the RDD as non-persistent, and remove all blocks for it from 
 memory and disk
 
 
 2014-03-19 16:40 GMT+08:00 林武康 vboylin1...@gmail.com:
 
 Hi, can any one tell me about the lifecycle of an rdd? I search through the 
 official website and still can't figure it out. Can I use an rdd in some 
 stages and destroy it in order to release memory because that no stages ahead 
 will use this rdd any more. Is it possible?
 
 Thanks!
 
 Sincerely 
 Lin wukang
 
 
 



Re: Pyspark worker memory

2014-03-20 Thread Matei Zaharia
Yeah, this is definitely confusing. The motivation for this was that different 
users of the same cluster may want to set different memory sizes for their 
apps, so we decided to put this setting in the driver. However, if you put 
SPARK_JAVA_OPTS in spark-env.sh, it also applies to executors, which is 
confusing (though in this case it wouldn’t overwrite spark.executor.memory 
AFAIK).

We want to clean a bunch of this stuff up for 1.0, or at least document it 
better. Thanks for the suggestions.

Matei

On Mar 19, 2014, at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote:

 To document this, it would be nice to clarify what environment
 variables should be used to set which Java system properties, and what
 type of process they affect.  I'd be happy to start a page if you can
 point me to the right place:
 
 SPARK_JAVA_OPTS:
  -Dspark.executor.memory can by set on the machine running the driver
 (typically the master host) and will affect the memory available to
 the Executor running on a slave node
  -D
 
 SPARK_DAEMON_OPTS:
  
 
 On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote:
 Thanks for the suggestion, Matei.  I've tracked this down to a setting
 I had to make on the Driver.  It looks like spark-env.sh has no impact
 on the Executor, which confused me for a long while with settings like
 SPARK_EXECUTOR_MEMORY.  The only setting that mattered was setting the
 system property in the *driver* (in this case pyspark/shell.py) or
 using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*.  I'm
 not sure how this varies from 0.9.0 release, but it seems to work on
 SNAPSHOT.
 
 On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Try checking spark-env.sh on the workers as well. Maybe code there is
 somehow overriding the spark.executor.memory setting.
 
 Matei
 
 On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 Hello, I'm using the Github snapshot of PySpark and having trouble setting
 the worker memory correctly. I've set spark.executor.memory to 5g, but
 somewhere along the way Xmx is getting capped to 512M. This was not
 occurring with the same setup and 0.9.0. How many places do I need to
 configure the memory? Thank you!
 
 



Re: DStream spark paper

2014-03-20 Thread Matei Zaharia
Hi Adrian,

On every timestep of execution, we receive new data, then report updated word 
counts for that new data plus the past 30 seconds. The latency here is about 
how quickly you get these updated counts once the new batch of data comes in. 
It’s true that the count reflects some data from 30 seconds ago as well, but it 
doesn’t mean the overall processing latency is 30 seconds.

Matei

On Mar 20, 2014, at 1:36 PM, Adrian Mocanu amoc...@verticalscope.com wrote:

 I looked over the specs on page 9 from 
 http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
 The first paragraph mentions the window size is 30 seconds “Word-Count, which 
 performs a sliding window count over 30s;
 and TopKCount, which finds the k most frequent words over the past 30s. “
  
 The second paragraph mentions subsecond latency.
  
 Putting these 2 together, is the paper saying that in the 30 sec window the 
 tuples are delayed at most 1 second?
  
 The paper explains “By “end-to-end latency,” we mean the time from when 
 records are sent to the system to when results incorporating them appear.” 
 This leads me to conclude that end-to-end latency for a 30 sec window should 
 be at least 30 seconds because results won’t be incorporated until the entire 
 window is completed ie: 30sec. At the same time the paper claims latency is 
 sub second so clearly I’m misunderstanding something.
  
 -Adrian



Re: How to save as a single file efficiently?

2014-03-21 Thread Matei Zaharia
Try passing the shuffle=true parameter to coalesce, then it will do the map in 
parallel but still pass all the data through one reduce node for writing it 
out. That’s probably the fastest it will get. No need to cache if you do that.

Matei

On Mar 21, 2014, at 4:04 PM, Aureliano Buendia buendia...@gmail.com wrote:

 Hi,
 
 Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We 
 found that a partition number of 1000 is a good number to speed the process 
 up. However, it does not make sense to have 1000 pieces of csv files each 
 less than 1 kb.
 
 We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and 
 we are not properly using our resources this way. So this is very slow:
 
 rdd.map(...).coalesce(1).saveAsTextFile()
 
 How is it possible to use coalesce(1) simply for concatenating the 
 materialized output text files? Would something like this make sense?:
 
 rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
 
 Or, would something like this achieve it?:
 
 rdd.map(...).cache().coalesce(1).saveAsTextFile()



Re: How to save as a single file efficiently?

2014-03-21 Thread Matei Zaharia
Ah, the reason is because coalesce is often used to deal with lots of small 
input files on HDFS. In that case you don’t want to reshuffle them all across 
the network, you just want each mapper to directly read multiple files (and you 
want fewer than one mapper per file).

Matei

On Mar 21, 2014, at 5:01 PM, Aureliano Buendia buendia...@gmail.com wrote:

 Good to know it's as simple as that! I wonder why shuffle=true is not the 
 default for coalesce().
 
 
 On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Try passing the shuffle=true parameter to coalesce, then it will do the map 
 in parallel but still pass all the data through one reduce node for writing 
 it out. That’s probably the fastest it will get. No need to cache if you do 
 that.
 
 Matei
 
 On Mar 21, 2014, at 4:04 PM, Aureliano Buendia buendia...@gmail.com wrote:
 
  Hi,
 
  Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We 
  found that a partition number of 1000 is a good number to speed the process 
  up. However, it does not make sense to have 1000 pieces of csv files each 
  less than 1 kb.
 
  We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, 
  and we are not properly using our resources this way. So this is very slow:
 
  rdd.map(...).coalesce(1).saveAsTextFile()
 
  How is it possible to use coalesce(1) simply for concatenating the 
  materialized output text files? Would something like this make sense?:
 
  rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
 
  Or, would something like this achieve it?:
 
  rdd.map(...).cache().coalesce(1).saveAsTextFile()
 
 



Re: error loading large files in PySpark 0.9.0

2014-03-23 Thread Matei Zaharia
Hey Jeremy, what happens if you pass batchSize=10 as an argument to your 
SparkContext? It tries to serialize that many objects together at a time, which 
might be too much. By default the batchSize is 1024.

Matei

On Mar 23, 2014, at 10:11 AM, Jeremy Freeman freeman.jer...@gmail.com wrote:

 Hi all,
 
 Hitting a mysterious error loading large text files, specific to PySpark
 0.9.0.
 
 In PySpark 0.8.1, this works:
 
 data = sc.textFile(path/to/myfile)
 data.count()
 
 But in 0.9.0, it stalls. There are indications of completion up to:
 
 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X
 (progress: 15/537)
 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4)
 
 And then this repeats indefinitely
 
 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
 runningTasks: 144
 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
 runningTasks: 144
 
 Always stalls at the same place. There's nothing in stderr on the workers,
 but in stdout there are several of these messages:
 
 INFO PythonRDD: stdin writer to Python finished early
 
 So perhaps the real error is being suppressed as in
 https://spark-project.atlassian.net/browse/SPARK-1025
 
 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k
 characters per row. Running on a private cluster with 10 nodes, 100GB / 16
 cores each, Python v 2.7.6.
 
 I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0,
 and in PySpark in 0.8.1. Happy to post the file, but it should repro for
 anything with these dimensions. It *might* be specific to long strings: I
 don't see it with fewer characters (10k) per row, but I also don't see it
 with many fewer rows but the same number of characters per row.
 
 Happy to try and provide more info / help debug!
 
 -- Jeremy
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Announcing Spark SQL

2014-03-26 Thread Matei Zaharia
Congrats Michael  co for putting this together — this is probably the neatest 
piece of technology added to Spark in the past few months, and it will greatly 
change what users can do as more data sources are added.

Matei


On Mar 26, 2014, at 3:22 PM, Ognen Duzlevski og...@plainvanillagames.com 
wrote:

 Wow!
 Ognen
 
 On 3/26/14, 4:58 PM, Michael Armbrust wrote:
 Hey Everyone,
 
 This already went out to the dev list, but I wanted to put a pointer here as 
 well to a new feature we are pretty excited about for Spark 1.0.
 
 http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
 
 Michael
 



Re: All pairs shortest paths?

2014-03-26 Thread Matei Zaharia
Yeah, if you’re just worried about statistics, maybe you can do sampling (do 
single-pair paths from 100 random nodes and you get an idea of what percentage 
of nodes have what distribution of neighbors in a given distance).

Matei

On Mar 26, 2014, at 5:55 PM, Ryan Compton compton.r...@gmail.com wrote:

 Much thanks, I suspected this would be difficult. I was hoping to
 generate some 4 degrees of separation-like statistics. Looks like
 I'll just have to work with a subset of my graph.
 
 On Wed, Mar 26, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 All-pairs distances is tricky for a large graph because you need O(V^2) 
 storage. Do you want to just quickly query the distance between two 
 vertices? In that case you can do single-source shortest paths, which I 
 believe exists in GraphX, or at least is very quick to implement on top of 
 its Pregel API. If your graph is small enough that storing all-pairs is 
 feasible, you can probably run this as an iterative algorithm: 
 http://en.wikipedia.org/wiki/Floyd–Warshall_algorithm, though I haven’t 
 tried it. It may be tough to do with GraphX.
 
 Matei
 
 On Mar 26, 2014, at 3:51 PM, Ryan Compton compton.r...@gmail.com wrote:
 
 To clarify: I don't need the actual paths, just the distances.
 
 On Wed, Mar 26, 2014 at 3:04 PM, Ryan Compton compton.r...@gmail.com 
 wrote:
 No idea how feasible this is. Has anyone done it?
 



Re: pySpark memory usage

2014-03-27 Thread Matei Zaharia
I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We’ll 
try to look into these, seems like a serious error.

Matei

On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Matei.  I am running Spark 1.0.0-SNAPSHOT built for Hadoop
 1.0.4 from GitHub on 2014-03-18.
 
 I tried batchSizes of 512, 10, and 1 and each got me further but none
 have succeeded.
 
 I can get this to work -- with manual interventions -- if I omit
 `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
 of the 175 executors hung, and I had to kill the python process to get
 things going again.  The only indication of this in the logs was `INFO
 python.PythonRDD: stdin writer to Python finished early`.
 
 With batchSize=1 and persist, a new memory error came up in several
 tasks, before the app was failed:
 
 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
 thread Thread[stdin writer for python,5,main]
 java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
 
 There are other exceptions, but I think they all stem from the above,
 eg. org.apache.spark.SparkException: Error sending message to
 BlockManagerMaster
 
 Let me know if there are other settings I should try, or if I should
 try a newer snapshot.
 
 Thanks again!
 
 
 On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim,
 
 In Spark 0.9 we added a batchSize parameter to PySpark that makes it group 
 multiple objects together before passing them between Java and Python, but 
 this may be too high by default. Try passing batchSize=10 to your 
 SparkContext constructor to lower it (the default is 1024). Or even 
 batchSize=1 to match earlier versions.
 
 Matei
 
 On Mar 21, 2014, at 6:18 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 Hi all, I'm wondering if there's any settings I can use to reduce the
 memory needed by the PythonRDD when computing simple stats.  I am
 getting OutOfMemoryError exceptions while calculating count() on big,
 but not absurd, records.  It seems like PythonRDD is trying to keep
 too many of these records in memory, when all that is needed is to
 stream through them and count.  Any tips for getting through this
 workload?
 
 
 Code:
 session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
 
 # the biggest individual text line is ~3MB
 parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s):
 (loads(y), loads(s)))
 parsed.persist(StorageLevel.MEMORY_AND_DISK)
 
 parsed.count()
 # will never finish: executor.Executor: Uncaught exception will FAIL
 all executors
 
 Incidentally the whole app appears to be killed, but this error is not
 propagated to the shell.
 
 Cluster:
 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
 
 Exception:
 java.lang.OutOfMemoryError: Java heap space
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
 



Re: Strange behavior of RDD.cartesian

2014-03-28 Thread Matei Zaharia
Weird, how exactly are you pulling out the sample? Do you have a small program 
that reproduces this?

Matei

On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 I forgot to mention that I don't really use all of my data. Instead I use a 
 sample extracted with randomSample. 
 
 
 On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Hi all,
 
 I notice that RDD.cartesian has a strange behavior with cached and uncached 
 data. More precisely, I have a set of data that I load with objectFile
 
 val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data)
 
 Then I split it in two set depending on some criteria
 
 
 val part1 = data.filter(_._2 matches view1)
 val part2 = data.filter(_._2 matches view2)
 
 
 Finally, I compute the cartesian product of part1 and part2
 
 val pair = part1.cartesian(part2)
 
 
 If every thing goes well I should have 
 
 pair.count == part1.count * part2.count
 
 But this is not the case if I don't cache part1 and part2.
 
 What I was missing ? Does caching data mandatory in Spark ?
 
 Cheers,
 
 Jaonary
 
 
 
 



Re: [shark-users] SQL on Spark - Shark or SparkSQL

2014-03-30 Thread Matei Zaharia
Hi Manoj,

At the current time, for drop-in replacement of Hive, it will be best to stick 
with Shark. Over time, Shark will use the Spark SQL backend, but should remain 
deployable the way it is today (including launching the SharkServer, using the 
Hive CLI, etc). Spark SQL is better for accessing Hive data within a Spark 
program though, where its APIs are richer and easier to link to than the 
SharkContext.sql2rdd we had previously provided in Shark.

So in a nutshell, if you have a Shark deployment today, or need the HiveServer, 
then going with Shark will be fine and we will switch out the backend in a 
future release (we’ll probably create preview of this even before we’re ready 
to fully switch). If you just want to run SQL queries or load SQL data within a 
Spark program, try out Spark SQL.

Matei

On Mar 30, 2014, at 4:46 PM, Mayur Rustagi mayur.rust...@gmail.com wrote:

 +1 Have done a few installations of Shark with customers using Hive, they 
 love it. Would be good to maintain compatibility with Metastore  QL till we 
 have substantial reason to break off (like BlinkDB). 
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Sun, Mar 30, 2014 at 2:46 AM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:
 This is a great question. We are in the same position, having not invested in 
 Hive yet and looking at various options for SQL-on-Hadoop.
 
 
 On Sat, Mar 29, 2014 at 9:48 PM, Manoj Samel manojsamelt...@gmail.com wrote:
 Hi,
 
 In context of the recent Spark SQL announcement 
 (http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html).
 
 If there is no existing investment in Hive/Shark, would it be worth starting 
 a new SQL work using SparkSQL rather than Shark ?
 
 * It seems Shark SQL core will use more and more of SparkSQL
 * From the blog, it seems Shark has baggage from Hive, that is not needed in 
 this case
 
 On the other hand, there seems to be two shortcomings of SparkSQL (from a 
 quick scan of blog and doc) 
 
 * SparkSQL will have less features than Shark/Hive QL, at least for now.
 * The standalone SharkServer feature will not be available in SparkSQL.
 
 Can someone from Databricks shed light on what is the long term roadmap? It 
 will help in avoiding investing in older/two technologies for work with no 
 Hive needs.
 
 Thanks,
 
 PS: Great work on SparkSQL
 
 
 
 
 -- 
 You received this message because you are subscribed to the Google Groups 
 shark-users group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to shark-users+unsubscr...@googlegroups.com.
 To post to this group, send email to shark-us...@googlegroups.com.
 Visit this group at http://groups.google.com/group/shark-users.
 For more options, visit https://groups.google.com/d/optout.



Re: Mllib in pyspark for 0.8.1

2014-04-01 Thread Matei Zaharia
You could probably port it back, but it required some changes on the Java side 
as well (a new PythonMLUtils class). It might be easier to fix the Mesos issues 
with 0.9.

Matei

On Apr 1, 2014, at 8:53 AM, Ian Ferreira ianferre...@hotmail.com wrote:

 
 Hi there,
 
 For some reason the distribution and build for 0.8.1 does not include
 the MLLib libraries for pyspark i.e. import from mllib fails.
 
 Seems to be addressed in 0.9.0, but that has other issue running on
 mesos in standalone mode :)
 
 Any pointers?
 
 Cheers
 - Ian
 



Re: Spark 1.0.0 release plan

2014-04-03 Thread Matei Zaharia
Hey Bhaskar, this is still the plan, though QAing might take longer than 15 
days. Right now since we’ve passed April 1st, the only features considered for 
a merge are those that had pull requests in review before. (Some big ones are 
things like annotating the public APIs and simplifying configuration). Bug 
fixes and things like adding Python / Java APIs for new components will also 
still be considered.

Matei

On Apr 3, 2014, at 10:30 AM, Bhaskar Dutta bhas...@gmail.com wrote:

 Hi,
 
 Is there any change in the release plan for Spark 1.0.0-rc1 release date from 
 what is listed in the Proposal for Spark Release Strategy thread?
 == Tentative Release Window for 1.0.0 ==
 Feb 1st - April 1st: General development
 April 1st: Code freeze for new features
 April 15th: RC1
 Thanks,
 Bhaskar



Re: Optimal Server Design for Spark

2014-04-03 Thread Matei Zaharia
To run multiple workers with Spark’s standalone mode, set 
SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES in conf/spark-env.sh. For 
example, if you have 16 cores and want 2 workers, you could add

export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_CORES=8

Matei

On Apr 3, 2014, at 12:38 PM, Mayur Rustagi mayur.rust...@gmail.com wrote:

 Are your workers not utilizing all the cores?
 One worker will utilize multiple cores depending on resource allocation.
 Regards
 Mayur
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Wed, Apr 2, 2014 at 7:19 PM, Debasish Das debasish.da...@gmail.com wrote:
 Hi Matei,
 
 How can I run multiple Spark workers per node ? I am running 8 core 10 node 
 cluster but I do have 8 more cores on each nodeSo having 2 workers per 
 node will definitely help my usecase.
 
 Thanks.
 Deb
 
 
 
 
 On Wed, Apr 2, 2014 at 3:58 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hey Steve,
 
 This configuration sounds pretty good. The one thing I would consider is 
 having more disks, for two reasons — Spark uses the disks for large shuffles 
 and out-of-core operations, and often it’s better to run HDFS or your storage 
 system on the same nodes. But whether this is valuable will depend on whether 
 you plan to do that in your deployment. You should determine that and go from 
 there.
 
 The amount of cores and RAM are both good — actually with a lot more of these 
 you would probably want to run multiple Spark workers per node, which is more 
 work to configure. Your numbers are in line with other deployments.
 
 There’s a provisioning overview with more details at 
 https://spark.apache.org/docs/latest/hardware-provisioning.html but what you 
 have sounds fine.
 
 Matei
 
 On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote:
 
  Hi Folks
 
  I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop 
  Server design but there does not seem to be much Spark related collateral 
  around infrastructure guidelines (or at least I haven't been able to find 
  them). My current thinking for server design is something along these lines.
 
  - 2 x 10Gbe NICs
  - 128 GB RAM
  - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 
  4 x 1TB for Data Drives)
  - 1 Disk Controller
  - 2 x 2.6 GHz 6 core processors
 
  If I stick with 1u servers then I lose disk capacity per rack but I get a 
  lot more memory and CPU capacity per rack. This increases my total cluster 
  memory footprint and it doesn't seem to make sense to have super dense 
  storage servers because I can't fit all that data on disk in memory 
  anyways. So at present, my thinking is to go with 1u servers instead of 2u 
  Servers. Is 128GB RAM per server normal? Do you guys use more or less than 
  that?
 
  Any feedback would be appreciated
 
  Regards
  Steve Watt
 
 
 



Re: How are exceptions in map functions handled in Spark?

2014-04-04 Thread Matei Zaharia
Exceptions should be sent back to the driver program and logged there (with a 
SparkException thrown if a task fails more than 4 times), but there were some 
bugs before where this did not happen for non-Serializable exceptions. We 
changed it to pass back the stack traces only (as text), which should always 
work. I’d recommend trying a newer Spark version, 0.8 should be easy to upgrade 
to from 0.7.

Matei

On Apr 4, 2014, at 10:40 AM, John Salvatier jsalvat...@gmail.com wrote:

 I'm trying to get a clear idea about how exceptions are handled in Spark? Is 
 there somewhere where I can read about this? I'm on spark .7
 
 For some reason I was under the impression that such exceptions are swallowed 
 and the value that produced them ignored but the exception is logged. 
 However, right now we're seeing the task just re-tried over and over again in 
 an infinite loop because there's a value that always generates an exception.
 
 John



Re: example of non-line oriented input data?

2014-04-04 Thread Matei Zaharia
FYI, one thing we’ve added now is support for reading multiple text files from 
a directory as separate records: https://github.com/apache/spark/pull/327. This 
should remove the need for mapPartitions discussed here.

Avro and SequenceFiles look like they may not make it for 1.0, but there’s a 
chance that Parquet support with Spark SQL will, which should let you store 
binary data a bit better.

Matei

On Mar 19, 2014, at 3:12 PM, Jeremy Freeman freeman.jer...@gmail.com wrote:

 Another vote on this, support for simple SequenceFiles and/or Avro would be 
 terrific, as using plain text can be very space-inefficient, especially for 
 numerical data.
 
 -- Jeremy
 
 On Mar 19, 2014, at 5:24 PM, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 
 I'd second the request for Avro support in Python first, followed by Parquet.
 
 
 On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin itparan...@gmail.com 
 wrote:
 
 On 19 Mar 2014, at 19:54, Diana Carroll dcarr...@cloudera.com wrote:
 
 Actually, thinking more on this question, Matei: I'd definitely say support 
 for Avro.  There's a lot of interest in this!!
 
 
 Agree, and parquet as default Cloudera Impala format.
 
 
 
 
 On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 BTW one other thing — in your experience, Diana, which non-text 
 InputFormats would be most useful to support in Python first? Would it be 
 Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or 
 something else? I think a per-file text input format that does the stuff we 
 did here would also be good.
 
 Matei
 
 
 On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 
 Hi Diana,
 
 This seems to work without the iter() in front if you just return 
 treeiterator. What happened when you didn’t include that? Treeiterator 
 should return an iterator.
 
 Anyway, this is a good example of mapPartitions. It’s one where you want 
 to view the whole file as one object (one XML here), so you couldn’t 
 implement this using a flatMap, but you still want to return multiple 
 values. The MLlib example you saw needs Python 2.7 because unfortunately 
 that is a requirement for our Python MLlib support (see 
 http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
  We’d like to relax this later but we’re using some newer features of 
 NumPy and Python. The rest of PySpark works on 2.6.
 
 In terms of the size in memory, here both the string s and the XML tree 
 constructed from it need to fit in, so you can’t work on very large 
 individual XML files. You may be able to use a streaming XML parser 
 instead to extract elements from the data in a streaming fashion, without 
 every materializing the whole tree. 
 http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader
  is one example.
 
 Matei
 
 On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote:
 
 Well, if anyone is still following this, I've gotten the following code 
 working which in theory should allow me to parse whole XML files: (the 
 problem was that I can't return the tree iterator directly.  I have to 
 call iter().  Why?)
 
 import xml.etree.ElementTree as ET
 
 # two source files, format data country 
 name=../country.../data
 mydata=sc.textFile(file:/home/training/countries*.xml) 
 
 def parsefile(iterator):
 s = ''
 for i in iterator: s = s + str(i)
 tree = ET.fromstring(s)
 treeiterator = tree.getiterator(country)
 # why to I have to convert an iterator to an iterator?  not sure but 
 required
 return iter(treeiterator)
 
 mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: 
 element.attrib).collect()
 
 The output is what I expect:
 [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
 
 BUT I'm a bit concerned about the construction of the string s.  How 
 big can my file be before converting it to a string becomes problematic?
 
 
 
 On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.com 
 wrote:
 Thanks, Matei.
 
 In the context of this discussion, it would seem mapParitions is 
 essential, because it's the only way I'm going to be able to process each 
 file as a whole, in our example of a large number of small XML files 
 which need to be parsed as a whole file because records are not required 
 to be on a single line.
 
 The theory makes sense but I'm still utterly lost as to how to implement 
 it.  Unfortunately there's only a single example of the use of 
 mapPartitions in any of the Python example programs, which is the log 
 regression example, which I can't run because it requires Python 2.7 and 
 I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 
 is unsupported...is it?)
 
 I'd really really love to see a real life example of a Python use of 
 mapPartitions.  I do appreciate the very simple examples you provided, 
 but (perhaps because of my novice status on Python) I can't figure out

Re: Having spark-ec2 join new slaves to existing cluster

2014-04-04 Thread Matei Zaharia
This can’t be done through the script right now, but you can do it manually as 
long as the cluster is stopped. If the cluster is stopped, just go into the AWS 
Console, right click a slave and choose “launch more of these” to add more. Or 
select multiple slaves and delete them. When you run spark-ec2 start the next 
time to start your cluster, it will set it up on all the machines it finds in 
the mycluster-slaves security group.

This is pretty hacky so it would definitely be good to add this feature; feel 
free to open a JIRA about it.

Matei

On Apr 4, 2014, at 12:16 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:

 I would like to be able to use spark-ec2 to launch new slaves and add them to 
 an existing, running cluster. Similarly, I would also like to remove slaves 
 from an existing cluster.
 
 Use cases include:
 Oh snap, I sized my cluster incorrectly. Let me add/remove some slaves.
 During scheduled batch processing, I want to add some new slaves, perhaps on 
 spot instances. When that processing is done, I want to kill them. (Cruel, I 
 know.)
 I gather this is not possible at the moment. spark-ec2 appears to be able to 
 launch new slaves for an existing cluster only if the master is stopped. I 
 also do not see any ability to remove slaves from a cluster.
 
 Is that correct? Are there plans to add such functionality to spark-ec2 in 
 the future?
 
 Nick
 
 
 View this message in context: Having spark-ec2 join new slaves to existing 
 cluster
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark on other parallel filesystems

2014-04-04 Thread Matei Zaharia
As long as the filesystem is mounted at the same path on every node, you should 
be able to just run Spark and use a file:// URL for your files.

The only downside with running it this way is that Lustre won’t expose data 
locality info to Spark, the way HDFS does. That may not matter if it’s a 
network-mounted file system though.

Matei

On Apr 4, 2014, at 4:56 PM, Venkat Krishnamurthy ven...@yarcdata.com wrote:

 All
 
 Are there any drawbacks or technical challenges (or any information, really) 
 related to using Spark directly on a global parallel filesystem  like 
 Lustre/GPFS? 
 
 Any idea of what would be involved in doing a minimal proof of concept? Is it 
 just possible to run Spark unmodified (without the HDFS substrate) for a 
 start, or will that not work at all? I do know that it’s possible to 
 implement Tachyon on Lustre and get the HDFS interface – just looking at 
 other options.
 
 Venkat



Re: Spark 0.9.1 released

2014-04-09 Thread Matei Zaharia
Thanks TD for managing this release, and thanks to everyone who contributed!

Matei

On Apr 9, 2014, at 2:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

 A small additional note: Please use the direct download links in the Spark 
 Downloads page. The Apache mirrors take a day or so to sync from the main 
 repo, so may not work immediately.
 
 TD
 
 
 On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com 
 wrote:
 Hi everyone,
 
 We have just posted Spark 0.9.1, which is a maintenance release with
 bug fixes, performance improvements, better stability with YARN and
 improved parity of the Scala and Python API. We recommend all 0.9.0
 users to upgrade to this stable release.
 
 This is the first release since Spark graduated as a top level Apache
 project. Contributions to this release came from 37 developers.
 
 The full release notes are at:
 http://spark.apache.org/releases/spark-release-0-9-1.html
 
 You can download the release at:
 http://spark.apache.org/downloads.html
 
 Thanks all the developers who contributed to this release:
 Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
 Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
 Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
 Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
 Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
 Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
 Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
 shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng
 
 TD
 



Re: pySpark memory usage

2014-04-09 Thread Matei Zaharia
Okay, thanks. Do you have any info on how large your records and data file are? 
I’d like to reproduce and fix this.

Matei

On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Hi Matei, thanks for working with me to find these issues.
 
 To summarize, the issues I've seen are:
 0.9.0:
 - https://issues.apache.org/jira/browse/SPARK-1323
 
 SNAPSHOT 2014-03-18:
 - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
 Java heap space.  To me this indicates a memory leak since Spark
 should simply be counting records of size  3MB
 - Without persist(), stdin writer to Python finished early hangs the
 application, unknown root cause
 
 I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
 debugging turned on.  This gives me the stacktrace on the new stdin
 problem:
 
 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
 java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at 
 sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at 
 org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
at 
 org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at 
 org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
at 
 org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
at 
 org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
 
 
 On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Cool, thanks for the update. Have you tried running a branch with this fix 
 (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
 issue are you referring to, is it separate from this? (Couldn't find it 
 earlier in the thread.)
 
 To turn on debug logging, copy conf/log4j.properties.template to 
 conf/log4j.properties and change the line log4j.rootCategory=INFO, console 
 to log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
 conf on all workers.
 
 BTW I've managed to run PySpark with this fix on some reasonably large S3 
 data (multiple GB) and it was fine. It might happen only if records are 
 large, or something like that. How much heap are you giving to your 
 executors, and does it show that much in the web UI?
 
 Matei
 
 On Mar 29, 2014, at 10:44 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 I think the problem I ran into in 0.9 is covered in
 https://issues.apache.org/jira/browse/SPARK-1323
 
 When I kill the python process, the stacktrace I gets indicates that
 this happens at initialization.  It looks like the initial write to
 the Python process does not go through, and then the iterator hangs
 waiting for output.  I haven't had luck turning on debugging for the
 executor process.  Still trying

Re: NPE using saveAsTextFile

2014-04-10 Thread Matei Zaharia
I haven’t seen this but it may be a bug in Typesafe Config, since this is 
serializing a Config object. We don’t actually use Typesafe Config ourselves.

Do you have any nulls in the data itself by any chance? And do you know how 
that Config object is getting there?

Matei

On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com wrote:

 Anyone have a chance to look at this?
 
 Am I just doing something silly somewhere?
 
 If it makes any difference, I am using the elasticsearch-hadoop plugin for 
 ESInputFormat. But as I say, I can parse the data (count, first() etc). I 
 just can't save it as text file.
 
 
 
 
 On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath nick.pentre...@gmail.com 
 wrote:
 Hi
 
 I'm using Spark 0.9.0.
 
 When calling saveAsTextFile on a custom hadoop inputformat (loaded with 
 newAPIHadoopRDD), I get the following error below.
 
 If I call count, I get the correct count of number of records, so the 
 inputformat is being read correctly... the issue only appears when trying to 
 use saveAsTextFile.
 
 If I call first() I get the correct output, also. So it doesn't appear to be 
 anything with the data or inputformat.
 
 Any idea what the actual problem is, since this stack trace is not obvious 
 (though it seems to be in ResultTask which ultimately causes this).
 
 Is this a known issue at all?
 
 
 ==
 
 14/04/08 16:00:46 ERROR OneForOneStrategy: 
 java.lang.NullPointerException
   at 
 com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
   at 
 com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
   at 
 com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
   at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
   at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
   at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
   at 
 

Re: Spark - ready for prime time?

2014-04-10 Thread Matei Zaharia
To add onto the discussion about memory working space, 0.9 introduced the 
ability to spill data within a task to disk, and in 1.0 we’re also changing the 
interface to allow spilling data within the same *group* to disk (e.g. when you 
do groupBy and get a key with lots of values). The main reason these weren’t 
there was that for a lot of workloads (everything except the same key having 
lots of values), simply launching more reduce tasks was also a good solution, 
because it results in an external sort across the cluster similar to what would 
happen within a task.

Overall, expect to see more work to both explain how things execute 
(http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the 
monitoring UI is another) and try to make things require no configuration out 
of the box. We’re doing a lot of this based on user feedback, so that’s 
definitely appreciated.

Matei

On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote:
 The biggest issue I've come across is that the cluster is somewhat unstable 
 when under memory pressure.  Meaning that if you attempt to persist an RDD 
 that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
 OOMs.  I had to carefully modify some of the space tuning parameters and GC 
 settings to get some jobs to even finish.
 
 The other issue I've observed is if you group on a key that is highly skewed, 
 with a few massively-common keys and a long tail of rare keys, the one 
 massive key can be too big for a single machine and again cause OOMs.
 
 My take on it -- Spark doesn't believe in sort-and-spill things to enable 
 super long groups, and IMO for a good reason. Here are my thoughts:
 
 (1) in my work i don't need sort in 99% of the cases, i only need group 
 which absolutely doesn't need the spill which makes things slow down to a 
 crawl. 
 (2) if that's an aggregate (such as group count), use combine(), not 
 groupByKey -- this will do tons of good on memory use.
 (3) if you really need groups that don't fit into memory, that is always 
 because you want to do something that is other than aggregation, with them. 
 E,g build an index of that grouped data. we actually had a case just like 
 that. In this case your friend is really not groupBy, but rather PartitionBy. 
 I.e. what happens there you build a quick count sketch, perhaps on 
 downsampled data, to figure which keys have sufficiently big count -- and 
 then you build a partitioner that redirects large groups to a dedicated 
 map(). assuming this map doesn't try to load things in memory but rather do 
 something like streaming BTree build, that should be fine. In certain 
 cituations such processing may require splitting super large group even into 
 smaller sub groups (e.g. partitioned BTree structure), at which point you 
 should be fine even from uniform load point of view. It takes a little of 
 jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
 do this all for you in the groupBy contract.
 
  
 
 I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
 
 Just my personal experience, but I've observed significant improvements in 
 stability since even the 0.7.x days, so I'm confident that things will 
 continue to get better as long as people report what they're seeing so it can 
 get fixed.
 
 Andrew
 
 
 On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com 
 wrote:
 I'll provide answers from our own experience at Bizo.  We've been using Spark 
 for 1+ year now and have found it generally better than previous approaches 
 (Hadoop + Hive mostly).
 
 
 
 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:
 I. Is it too much magic? Lots of things just work right in Spark and it's 
 extremely convenient and efficient when it indeed works. But should we be 
 worried that customization is hard if the built in behavior is not quite 
 right for us? Are we to expect hard to track down issues originating from the 
 black box behind the magic?
 
 I think is goes back to understanding Spark's architecture, its design 
 constraints and the problems it explicitly set out to address.   If the 
 solution to your problems can be easily formulated in terms of the map/reduce 
 model, then it's a good choice.  You'll want your customizations to go with 
 (not against) the grain of the architecture.
  
 II. Is it mature enough? E.g. we've created a pull request which fixes a 
 problem that we were very surprised no one ever stumbled upon before. So 
 that's why I'm asking: is Spark being already used in professional settings? 
 Can one already trust it being reasonably bug free and reliable?
 
 There are lots of ways to use Spark; and not all of the features are 
 necessarily at the same level of maturity.   For instance, we put all the 
 jars on the main classpath so we've never run into the 

Re: Spark 0.9.1 PySpark ImportError

2014-04-10 Thread Matei Zaharia
Kind of strange because we haven’t updated CloudPickle AFAIK. Is this a package 
you added on the PYTHONPATH? How did you set the path, was it in 
conf/spark-env.sh?

Matei

On Apr 10, 2014, at 7:39 AM, aazout albert.az...@velos.io wrote:

 I am getting a python ImportError on Spark standalone cluster. I have set the
 PYTHONPATH on both worker and slave and the package imports properly when I
 run PySpark command line on both machines. This only happens with Master -
 Slave communication. Here is the error below: 
 
 14/04/10 13:40:19 INFO scheduler.TaskSetManager: Loss was due to
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last): 
  File /root/spark/python/pyspark/worker.py, line 73, in main 
command = pickleSer._read_with_length(infile) 
  File /root/spark/python/pyspark/serializers.py, line 137, in
 _read_with_length 
return self.loads(obj) 
  File /root/spark/python/pyspark/cloudpickle.py, line 810, in subimport 
__import__(name) 
 ImportError: ('No module named volatility.atm_impl_vol', function subimport
 at 0xa36050, ('volatility.atm_impl_vol',)) 
 
 Any ideas?
 
 
 
 -
 CEO / Velos (velos.io)
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-PySpark-ImportError-tp4068.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark - ready for prime time?

2014-04-11 Thread Matei Zaharia
It’s not a new API, it just happens underneath the current one if you have 
spark.shuffle.spill set to true (which it is by default). Take a look at the 
config settings that mention “spill” in 
http://spark.incubator.apache.org/docs/latest/configuration.html.

Matei

On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io 
wrote:

 Matei,
 
 Where is the functionality in 0.9 to spill data within a task (separately 
 from persist)? My apologies if this is something obvious but I don't see it 
 in the api docs.
 
 -Suren
 
 
 
 On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 To add onto the discussion about memory working space, 0.9 introduced the 
 ability to spill data within a task to disk, and in 1.0 we’re also changing 
 the interface to allow spilling data within the same *group* to disk (e.g. 
 when you do groupBy and get a key with lots of values). The main reason these 
 weren’t there was that for a lot of workloads (everything except the same key 
 having lots of values), simply launching more reduce tasks was also a good 
 solution, because it results in an external sort across the cluster similar 
 to what would happen within a task.
 
 Overall, expect to see more work to both explain how things execute 
 (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, 
 the monitoring UI is another) and try to make things require no configuration 
 out of the box. We’re doing a lot of this based on user feedback, so that’s 
 definitely appreciated.
 
 Matei
 
 On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote:
 
 On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote:
 The biggest issue I've come across is that the cluster is somewhat unstable 
 when under memory pressure.  Meaning that if you attempt to persist an RDD 
 that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
 OOMs.  I had to carefully modify some of the space tuning parameters and GC 
 settings to get some jobs to even finish.
 
 The other issue I've observed is if you group on a key that is highly 
 skewed, with a few massively-common keys and a long tail of rare keys, the 
 one massive key can be too big for a single machine and again cause OOMs.
 
 My take on it -- Spark doesn't believe in sort-and-spill things to enable 
 super long groups, and IMO for a good reason. Here are my thoughts:
 
 (1) in my work i don't need sort in 99% of the cases, i only need group 
 which absolutely doesn't need the spill which makes things slow down to a 
 crawl. 
 (2) if that's an aggregate (such as group count), use combine(), not 
 groupByKey -- this will do tons of good on memory use.
 (3) if you really need groups that don't fit into memory, that is always 
 because you want to do something that is other than aggregation, with them. 
 E,g build an index of that grouped data. we actually had a case just like 
 that. In this case your friend is really not groupBy, but rather 
 PartitionBy. I.e. what happens there you build a quick count sketch, perhaps 
 on downsampled data, to figure which keys have sufficiently big count -- 
 and then you build a partitioner that redirects large groups to a dedicated 
 map(). assuming this map doesn't try to load things in memory but rather do 
 something like streaming BTree build, that should be fine. In certain 
 cituations such processing may require splitting super large group even into 
 smaller sub groups (e.g. partitioned BTree structure), at which point you 
 should be fine even from uniform load point of view. It takes a little of 
 jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
 do this all for you in the groupBy contract.
 
  
 
 I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
 
 Just my personal experience, but I've observed significant improvements in 
 stability since even the 0.7.x days, so I'm confident that things will 
 continue to get better as long as people report what they're seeing so it 
 can get fixed.
 
 Andrew
 
 
 On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com 
 wrote:
 I'll provide answers from our own experience at Bizo.  We've been using 
 Spark for 1+ year now and have found it generally better than previous 
 approaches (Hadoop + Hive mostly).
 
 
 
 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:
 I. Is it too much magic? Lots of things just work right in Spark and it's 
 extremely convenient and efficient when it indeed works. But should we be 
 worried that customization is hard if the built in behavior is not quite 
 right for us? Are we to expect hard to track down issues originating from 
 the black box behind the magic?
 
 I think is goes back to understanding Spark's architecture, its design 
 constraints and the problems it explicitly set out to address.   If the 
 solution to your problems can be easily formulated

Re: RDD.tail()

2014-04-14 Thread Matei Zaharia
You can use mapPartitionsWithIndex and look at the partition index (0 will be 
the first partition) to decide whether to skip the first line.

Matei

On Apr 14, 2014, at 8:50 AM, Ethan Jewett esjew...@gmail.com wrote:

 We have similar needs but IIRC, I came to the conclusion that this would only 
 work on ordered RDDs, and then you would still have to figure out which 
 partition is the first one. I ended up deciding it would be best to just drop 
 the header lines from a Scala iterator before creating an RDD based on it. 
 Not sure if this was the right thing to do, but would that work for you?
 
 Regards,
 Ethan
 
 
 On Mon, Apr 14, 2014 at 10:24 AM, Philip Ogren philip.og...@oracle.com 
 wrote:
 Has there been any thought to adding a tail() method to RDD?  It would be 
 really handy to skip over the first item in an RDD when it contains header 
 information.  Even better would be a drop(int) function that would allow you 
 to skip over several lines of header information.  Our attempts to do 
 something equivalent with a filter() call seem a bit contorted.  Any thoughts?
 
 Thanks,
 Philip
 



Re: process_local vs node_local

2014-04-14 Thread Matei Zaharia
Spark can actually launch multiple executors on the same node if you configure 
it that way, but if you haven’t done that, this might mean that some tasks are 
reading data from the cache, and some from HDFS. (In the HDFS case Spark will 
only report it as NODE_LOCAL since HDFS isn’t tied to a particular executor 
process). For example, maybe you cached some data but not all the partitions of 
the RDD are in memory. Are you using caching here?

There’s a locality wait setting in Spark (spark.locality.wait) that determines 
how long it will wait to go to the next locality level when it can’t launch 
stuff at its preferred one (e.g. to go from process to node). You can try 
increasing that too, by default it’s only 3000 ms. It might be that the whole 
RDD is cached but garbage collection causes it to give up waiting on some nodes 
and launch stuff on other nodes instead, which might be HDFS-local (due to data 
replication) but not cache-local.

Matei

On Apr 14, 2014, at 8:37 AM, dachuan hdc1...@gmail.com wrote:

 I am confused about the process local and node local, too.
 
 In my current understanding of Spark, one application typically only has one 
 executor in one node. However, node local means your data is in the same 
 host, but in a different executor.
 
 This further means node local is the same with process local unless one node 
 has two executors, which could only happen when one node has two Workers.
 
 Waiting for further discussion ..
 
 
 On Mon, Apr 14, 2014 at 10:13 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:
 I've a fairly large job (5E9 records, ~1600 partitions).wherein on a given 
 stage, it looks like for the first half of the tasks, everything runs in 
 process_local mode in ~10s/partition.  Then, from halfway through, everything 
 starts running in node_local mode, and takes 10x as long or more.
 
 I read somewhere that the difference between the two had to do with the data 
 being local to the running jvm, or another jvm on the same machine.  If 
 that's the case, shouldn't the distribution of the two modes be more random?  
 If not, what exactly is the difference between the two modes?  Given how much 
 longer it takes in node_local mode, it seems like the whole thing would 
 probably run much faster just by waiting for the right jvm to be free.  Is 
 there any way of forcing this?
 
 
 Thanks, 
   -Nathan
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com
 
 
 
 -- 
 Dachuan Huang
 Cellphone: 614-390-7234
 2015 Neil Avenue
 Ohio State University
 Columbus, Ohio
 U.S.A.
 43210



Re: using Kryo with pyspark?

2014-04-14 Thread Matei Zaharia
Kryo won’t make a major impact on PySpark because it just stores data as byte[] 
objects, which are fast to serialize even with Java. But it may be worth a try 
— you would just set spark.serializer and not try to register any classes. What 
might make more impact is storing data as MEMORY_ONLY_SER and turning on 
spark.rdd.compress, which will compress them. In Java this can add some CPU 
overhead but Python runs quite a bit slower so it might not matter, and it 
might speed stuff up by reducing GC or letting you cache more data.

Matei

On Apr 14, 2014, at 12:24 PM, Diana Carroll dcarr...@cloudera.com wrote:

 I'm looking at the Tuning Guide suggestion to use Kryo instead of default 
 serialization.  My questions:
 
 Does pyspark use Java serialization by default, as Scala spark does?  If so, 
 then...
 can I use Kryo with pyspark instead?  The instructions say I should register 
 my classes with the Kryo Serialization, but that's in Java/Scala.  If I 
 simply set the spark.serializer variable for my SparkContext, will it at 
 least use Kryo for Spark's own classes, even if I can't register any of my 
 own classes?
 
 Thanks,
 Diana



Re: partitioning of small data sets

2014-04-15 Thread Matei Zaharia
Yup, one reason it’s 2 actually is to give people a similar experience to 
working with large files, in case their code doesn’t deal well with the file 
being partitioned.

Matei

On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote:

 Take a look at the minSplits argument for SparkContext#textFile [1] -- the 
 default value is 2. You can simply set this to 1 if you'd prefer not to split 
 your data.
 
 [1] 
 http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext
 
 
 On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb
 
 Given the size, and that it is a single file, I assumed it would only be in a 
 single partition.  But when I cache it,  I can see in the Spark App UI that 
 it actually splits it into two partitions:
 
 sparkdev_2014-04-11.png
 
 Is this correct behavior?  How does Spark decide how big a partition should 
 be, or how many partitions to create for an RDD.
 
 If it matters, I have only a single worker in my cluster, so both 
 partitions are stored on the same worker.
 
 The file was on HDFS and was only a single block.
 
 Thanks for any insight.
 
 Diana
 
 
 



Re: Multi-tenant?

2014-04-15 Thread Matei Zaharia
Yes, both things can happen. Take a look at 
http://spark.apache.org/docs/latest/job-scheduling.html, which includes 
scheduling concurrent jobs within the same driver.

Matei

On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote:

 What is the support for multi-tenancy in Spark.
 
 I assume more than one driver can share the same cluster, but can a driver 
 run two jobs in parallel?
 



Re: PySpark still reading only text?

2014-04-16 Thread Matei Zaharia
Hi Bertrand,

We should probably add a SparkContext.pickleFile and RDD.saveAsPickleFile that 
will allow saving pickled objects. Unfortunately this is not in yet, but there 
is an issue up to track it: https://issues.apache.org/jira/browse/SPARK-1161.

In 1.0, one feature we do have now is the ability to load binary data from Hive 
using Spark SQL’s Python API. Later we will also be able to save to Hive.

Matei

On Apr 16, 2014, at 4:27 AM, Bertrand Dechoux decho...@gmail.com wrote:

 Hi,
 
 I have browsed the online documentation and it is stated that PySpark only 
 read text files as sources. Is it still the case?
 
 From what I understand, the RDD can after this first step be any serialized 
 python structure if the class definitions are well distributed.
 
 Is it not possible to read back those RDDs? That is create a flow to parse 
 everything and then, e.g. the next week, start from the binary, structured 
 data?
 
 Technically, what is the difficulty? I would assume the code reading a binary 
 python RDD or a binary python file to be quite similar. Where can I know more 
 about this subject?
 
 Thanks in advance
 
 Bertrand



Re: extremely slow k-means version

2014-04-19 Thread Matei Zaharia
The problem is that groupByKey means “bring all the points with this same key 
to the same JVM”. Your input is a Seq[Point], so you have to have all the 
points there. This means that a) all points will be sent across the network in 
a cluster, which is slow (and Spark goes through this sending code path even in 
local mode so it serializes the data), and b) you’ll get out of memory errors 
if that Seq is too big. In large-scale data processing, data movement is often 
the biggest cost, so you have to carefully choose which operations to use.

Matei

On Apr 19, 2014, at 4:04 AM, ticup tim.coppiete...@gmail.com wrote:

 Hi,
 
 I was playing around with other k-means implementations in Scala/Spark in
 order to test performances and get a better grasp on Spark.
 
 Now, I made one similar to the one from the examples
 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
 except that it's a bit less clever. Nevertheless, I expect a non-expert
 scala/spark programmer to write similar code instead of that from the
 example.
 
 Here is how they compare: in the step of calculating the new centroids (this
 is done by taking the average of all points belonging to the current
 centroids - the main workhorse of the algo), where the *example algorithm*
 adds the points of the same cluster and keeps track of the number of points
 in each cluster in 1 step (by using reduceByKey and keeping a counter in the
 reduce value):
 
 val closest = data.map (p = (closestPoint(p, kPoints), (p, 1)))
 
 val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2,
 y1 + y2)}
 
 and then proceeds by dividing the sum of all points of a cluster by the
 counted number of points in the cluster:
 
 val newPoints = pointStats.map {pair = (pair._1, pair._2._1 /
 pair._2._2)}.collectAsMap()
 
 Afterwards the change of the new centroids is calculated in order to know
 when to stop iterating:
 
 tempDist = 0.0
 
 for (i - 0 until K) {
 tempDist += kPoints(i).squaredDist(newPoints(i))
 }
 
 
 
 *my algorithm *
 (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
 is less clever, but more straightforward: it just groups all the points of
 each cluster and then proceeds to calculate the average on those points and
 adds the difference with the previous centroid to an accumulator:
 
 // accumulator for differences new centroids
 dist = sc.accumulator(0.0)
 
 // calculate new centroids + add difference to old centroids
 centroids = closest.groupByKey().map{case(i, points) =
val newCentroid = average(points)
dist += centroids(i).squaredDist(newCentroid)
newCentroid
 }.collect()
 
 with:
 
 def average(points: Seq[Vector]) : Vector = {
points.reduce(_+_) / points.length
 }
 
 So, the big differences are:
 
 1. Use of accumulator
 2. Do excessive work by not cleverly calculating the average
 3. Accesses the centroids val from within the map
 
 
 Now, why I'm here for, this version runs EXTREMELY slow and gets
 outOfHeapMemory exceptions for data input that the original algorithm easily
 solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
 huge difference. The use of an accumulator shouldn't really affect the
 performance and it doesn't, because I tried it without the accumulator and
 it stays as slow. Further, I expect the excessive work to slow down the
 algorithm with a factor of 2 or something, but this is really a decrease in
 factors of 10 or more.
 
 Even with 1 worker and 1 core (thus no parallelism) the difference in speed
 stays the same, so it's not because the averaging is not parallelised
 properly, there must be something going on that is much more important.
 
 Could someone give me pointers on what exactly is happening here? It can't
 be because I'm just accessing the centroids value from within the closure?
 
 Speed comparison:
 
 The *slow algorithm*: 44 seconds to perform the map
 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
 k-means.scala:114) finished in 43.909 s
 
 
 The *fast algorithm*: more or less the same operations (in 2 steps instead
 of 1) in 2.2 seconds
 
 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
 k-means.scala:84) finished in 2.090 s
 
 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
 k-means.scala:86) finished in 0.117 s
 
 
 Thanks in advance,
 Tim.
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming source from Amazon Kinesis

2014-04-21 Thread Matei Zaharia
There was a patch posted a few weeks ago 
(https://github.com/apache/spark/pull/223), but it needs a few changes in 
packaging because it uses a license that isn’t fully compatible with Apache. 
I’d like to get this merged when the changes are made though — it would be a 
good input source to support.

Matei


On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:

 I'm looking to start experimenting with Spark Streaming, and I'd like to use 
 Amazon Kinesis as my data source. Looking at the list of supported Spark 
 Streaming sources, I don't see any mention of Kinesis.
 
 Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there 
 plans to add such support in the future?
 
 Nick
 
 
 View this message in context: Spark Streaming source from Amazon Kinesis
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: error in mllib lr example code

2014-04-23 Thread Matei Zaharia
See http://people.csail.mit.edu/matei/spark-unified-docs/ for a more recent 
build of the docs; if you spot any problems in those, let us know.

Matei

On Apr 23, 2014, at 9:49 AM, Xiangrui Meng men...@gmail.com wrote:

 The doc is for 0.9.1. You are running a later snapshot, which added
 sparse vectors. Try LabeledPoint(parts(0).toDouble,
 Vectors.dense(parts(1).split(' ').map(x = x.toDouble)). The examples
 are updated in the master branch. You can also check the examples
 there. -Xiangrui
 
 On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi mohitja...@gmail.com wrote:
 
 sorry...added a subject now
 
 On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi mohitja...@gmail.com wrote:
 
 I am trying to run the example linear regression code from
 
 http://spark.apache.org/docs/latest/mllib-guide.html
 
 But I am getting the following error...am I missing an import?
 
 code
 
 import org.apache.spark._
 
 import org.apache.spark.mllib.regression.LinearRegressionWithSGD
 
 import org.apache.spark.mllib.regression.LabeledPoint
 
 
 object ModelLR {
 
  def main(args: Array[String]) {
 
val sc = new SparkContext(args(0), SparkLR,
 
  System.getenv(SPARK_HOME),
 SparkContext.jarOfClass(this.getClass).toSeq)
 
 // Load and parse the data
 
 val data = sc.textFile(mllib/data/ridge-data/lpsa.data)
 
 val parsedData = data.map { line =
 
  val parts = line.split(',')
 
  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =
 x.toDouble).toArray)
 
 }
 
 ...snip...
 
 }
 
 error
 
 - polymorphic expression cannot be instantiated to expected type; found :
 [U : Double]Array[U] required:
 
 org.apache.spark.mllib.linalg.Vector
 
 - polymorphic expression cannot be instantiated to expected type; found :
 [U : Double]Array[U] required:
 
 org.apache.spark.mllib.linalg.Vector
 
 



Re: How do I access the SPARK SQL

2014-04-23 Thread Matei Zaharia
It’s currently in the master branch, on https://github.com/apache/spark. You 
can check that out from git, build it with sbt/sbt assembly, and then try it 
out. We’re also going to post some release candidates soon that will be 
pre-built.

Matei

On Apr 23, 2014, at 1:30 PM, diplomatic Guru diplomaticg...@gmail.com wrote:

 Hello Team,
 
 I'm new to SPARK and just came across SPARK SQL, which appears to be 
 interesting but not sure how I could get it.
 
 I know it's an Alpha version but not sure if its available for community yet.
 
 Many thanks.
 
 Raj.



Re: Deploying a python code on a spark EC2 cluster

2014-04-24 Thread Matei Zaharia
Did you launch this using our EC2 scripts 
(http://spark.apache.org/docs/latest/ec2-scripts.html) or did you manually set 
up the daemons? My guess is that their hostnames are not being resolved 
properly on all nodes, so executor processes can’t connect back to your driver 
app. This error message indicates that:

14/04/24 09:00:49 WARN util.Utils: Your hostname, spark-node resolves to a
loopback address: 127.0.0.1; using 10.74.149.251 instead (on interface eth0)
14/04/24 09:00:49 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to
another address

If you launch with your EC2 scripts, or don’t manually change the hostnames, 
this should not happen.

Matei

On Apr 24, 2014, at 11:36 AM, John King usedforprinting...@gmail.com wrote:

 Same problem.
 
 
 On Thu, Apr 24, 2014 at 10:54 AM, Shubhabrata mail2shu...@gmail.com wrote:
 Moreover it seems all the workers are registered and have sufficient memory
 (2.7GB where as I have asked for 512 MB). The UI also shows the jobs are
 running on the slaves. But on the termial it is still the same error
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory
 
 Please see the screenshot. Thanks
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4761/33.png
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Deploying-a-python-code-on-a-spark-EC2-cluster-tp4758p4761.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: SparkPi performance-3 cluster standalone mode

2014-04-24 Thread Matei Zaharia
The problem is that SparkPi uses Math.random(), which is a synchronized method, 
so it can’t scale to multiple cores. In fact it will be slower on multiple 
cores due to lock contention. Try another example and you’ll see better 
scaling. I think we’ll have to update SparkPi to create a new Random in each 
task to avoid this.

Matei

On Apr 24, 2014, at 4:43 AM, Adnan nsyaq...@gmail.com wrote:

 Hi,
 
 Relatively new on spark and have tried running SparkPi example on a
 standalone 12 core three machine cluster. What I'm failing to understand is,
 that running this example with a single slice gives better performance as
 compared to using 12 slices. Same was the case when I was using parallelize
 function. The time is scaling almost linearly with adding each slice. Please
 let me know if I'm doing anything wrong. The code snippet is given below:
 
 
 
 Regards,
 
 Ahsan Ijaz
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkPi-performance-3-cluster-standalone-mode-tp4530p4751.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Finding bad data

2014-04-24 Thread Matei Zaharia
Hey Jim, this is unfortunately harder than I’d like right now, but here’s how 
to do it. Look at the stderr file of the executor on that machine, and you’ll 
see lines like this:

14/04/24 19:17:24 INFO HadoopRDD: Input split: 
file:/Users/matei/workspace/apache-spark/README.md:0+2000

This says what file it was reading, as well as what byte offset (that’s the 
0+2000 part). Unfortunately, because the executor is running multiple tasks at 
the same time, this message will be hard to associate with a particular task 
unless you only configure one core per executor. But it may help you spot the 
file.

The other way you might do it is a map() on the data before you process it that 
checks for error conditions. In that one you could print out the original input 
line.

I realize that neither of these is ideal. I’ve opened 
https://issues.apache.org/jira/browse/SPARK-1622 to try to expose this 
information somewhere else, ideally in the UI. The reason it wasn’t done so far 
is because some tasks in Spark can be reading from multiple Hadoop InputSplits 
(e.g. if you use coalesce(), or zip(), or similar), so it’s tough to do it in a 
super general setting.

Matei

On Apr 24, 2014, at 6:15 PM, Jim Blomo jim.bl...@gmail.com wrote:

 I'm using PySpark to load some data and getting an error while
 parsing it.  Is it possible to find the source file and line of the bad
 data?  I imagine that this would be extremely tricky when dealing with
 multiple derived RRDs, so an answer with the caveat of this only
 works when running .map() on an textFile() RDD is totally fine.
 Perhaps if the line number and file was available in pyspark I could
 catch the exception and output it with the context?
 
 Anyway to narrow down the problem input would be great. Thanks!



Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Matei Zaharia
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see 
http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably 
faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu earthson...@gmail.com wrote:

 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 
 this line is too slow. There are about 2 million elements in word_mapping.
 
 Is there a good style for writing a large collection to hdfs?
 
 import org.apache.spark._
 import SparkContext._
 import scala.io.Source
 object WFilter {
 def main(args: Array[String]) {
 val spark = new SparkContext(yarn-standalone,word 
 filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass))
 val stopset = 
 Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet
 val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
 val tf_map = spark broadcast 
 file.flatMap(_.split(\t)).map((_,1)).countByKey
 val df_map = spark broadcast 
 file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey
 val word_mapping = spark broadcast 
 Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
 def w_filter(w:String) = if (tf_map.value(w)  8 || df_map.value(w)  
 4 || (stopset contains w)) false else true
 val mapped = 
 file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))
 
 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
 spark.stop()
 }
 } 
 
 many thx:) 
 
 -- 
 
 ~
 Perfection is achieved 
 not when there is nothing more to add
  but when there is nothing left to take away



Re: Spark on Yarn or Mesos?

2014-04-27 Thread Matei Zaharia
From my point of view, both are supported equally. The YARN support is newer 
and that’s why there’s been a lot more action there in recent months.

Matei

On Apr 27, 2014, at 12:08 PM, Andrew Ash and...@andrewash.com wrote:

 That thread was mostly about benchmarking YARN vs standalone, and the results 
 are what I'd expect -- spinning up a Spark cluster on demand through YARN has 
 higher startup latency than using a standalone cluster, where the JVMs are 
 already initialized and ready.
 
 Given that there's a lot more commit activity around YARN as compared to 
 Mesos, does that mean that YARN integration is just earlier in the maturity 
 curve, or does it mean that YARN is the future and Mesos is in 
 maintenance-only mode?
 
 That may be more a question for the Databricks team though: will YARN and 
 Mesos be supported equally, or will one become the preferred method of doing 
 cluster management under Spark?
 
 Andrew
 
 
 On Thu, Apr 17, 2014 at 1:27 PM, Arpit Tak arpi...@sigmoidanalytics.com 
 wrote:
 Hi Wel,
 
 Take a look at this post...
 http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html
 
 Regards,
 Arpit Tak
 
 
 On Thu, Apr 17, 2014 at 3:42 PM, Wei Wang xwd0...@gmail.com wrote:
 Hi, there
 
 I would like to know is there any differences between Spark on Yarn and Spark 
 on Mesos. Is there any comparision between them? What are the advantages and 
 disadvantages for each of them. Is there any criterion for choosing between 
 Yarn and Mesos? 
 
 BTW, we need MPI in our framework, and I saw MPICH2 is included in Mesos. 
 Should it be the reason for choosing Mesos? 
 
 Thanks a lot!
 
 
 Weida
 
 



Re: Running a spark-submit compatible app in spark-shell

2014-04-27 Thread Matei Zaharia
Hi Roger,

You should be able to use the --jars argument of spark-shell to add JARs onto 
the classpath and then work with those classes in the shell. (A recent patch, 
https://github.com/apache/spark/pull/542, made spark-shell use the same 
command-line arguments as spark-submit). But this is a great question, we 
should test it out and see whether anything else would make development easier.

SBT also has an interactive shell where you can run classes in your project, 
but unfortunately Spark can’t deal with closures typed directly in that the 
right way. However you write your Spark logic in a method and just call that 
method from the SBT shell, that should work.

Matei

On Apr 27, 2014, at 3:14 PM, Roger Hoover roger.hoo...@gmail.com wrote:

 Hi,
 
 From the meetup talk about the 1.0 release, I saw that spark-submit will be 
 the preferred way to launch apps going forward.
 
 How do you recommend launching such jobs in a development cycle?  For 
 example, how can I load an app that's expecting to a given to spark-submit 
 into spark-shell?
 
 Also, can anyone recommend other tricks for rapid development?  I'm new to 
 Scala, sbt, etc.  I think sbt can watch for changes in source files and 
 compile them automatically.
 
 I want to be able to make code changes and quickly get into a spark-shell to 
 play around with them.
 
 I appreciate any advice.  Thanks,
 
 Roger



Re: Running out of memory Naive Bayes

2014-04-28 Thread Matei Zaharia
Not sure if this is always ideal for Naive Bayes, but you could also hash the 
features into a lower-dimensional space (e.g. reduce it to 50,000 features). 
For each feature simply take MurmurHash3(featureID) % 5 for example.

Matei

On Apr 27, 2014, at 11:24 PM, DB Tsai dbt...@stanford.edu wrote:

 Our customer asked us to implement Naive Bayes which should be able to at 
 least train news20 one year ago, and we implemented for them in Hadoop using 
 distributed cache to store the model.
 
 
 Sincerely,
 
 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
 On Sun, Apr 27, 2014 at 11:03 PM, Xiangrui Meng men...@gmail.com wrote:
 How big is your problem and how many labels? -Xiangrui
 
 On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai dbt...@stanford.edu wrote:
  Hi Xiangrui,
 
  We also run into this issue at Alpine Data Labs. We ended up using LRU cache
  to store the counts, and splitting those least used counts to distributed
  cache in HDFS.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Even the features are sparse, the conditional probabilities are stored
  in a dense matrix. With 200 labels and 2 million features, you need to
  store at least 4e8 doubles on the driver node. With multiple
  partitions, you may need more memory on the driver. Could you try
  reducing the number of partitions and giving driver more ram and see
  whether it can help? -Xiangrui
 
  On Sun, Apr 27, 2014 at 3:33 PM, John King usedforprinting...@gmail.com
  wrote:
   I'm already using the SparseVector class.
  
   ~200 labels
  
  
   On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng men...@gmail.com
   wrote:
  
   How many labels does your dataset have? -Xiangrui
  
   On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai dbt...@stanford.edu wrote:
Which version of mllib are you using? For Spark 1.0, mllib will
support sparse feature vector which will improve performance a lot
when computing the distance between points and centroid.
   
Sincerely,
   
DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai
   
   
On Sat, Apr 26, 2014 at 5:49 AM, John King
usedforprinting...@gmail.com wrote:
I'm just wondering are the SparkVector calculations really taking
into
account the sparsity or just converting to dense?
   
   
On Fri, Apr 25, 2014 at 10:06 PM, John King
usedforprinting...@gmail.com
wrote:
   
I've been trying to use the Naive Bayes classifier. Each example in
the
dataset is about 2 million features, only about 20-50 of which are
non-zero,
so the vectors are very sparse. I keep running out of memory
though,
even
for about 1000 examples on 30gb RAM while the entire dataset is 4
million
examples. And I would also like to note that I'm using the sparse
vector
class.
   
   
  
  
 
 
 



Re: K-means with large K

2014-04-28 Thread Matei Zaharia
Try turning on the Kryo serializer as described at 
http://spark.apache.org/docs/latest/tuning.html. Also, are there any exceptions 
in the driver program’s log before this happens?

Matei

On Apr 28, 2014, at 9:19 AM, Buttler, David buttl...@llnl.gov wrote:

 Hi,
 I am trying to run the K-means code in mllib, and it works very nicely with 
 small K (less than 1000).  However, when I try for a larger K (I am looking 
 for 2000-4000 clusters), it seems like the code gets part way through 
 (perhaps just the initialization step) and freezes.  The compute nodes stop 
 doing any CPU / network / IO and nothing happens for hours.  I had done 
 something similar back in the days of Spark 0.6, and I didn’t have any 
 trouble going up to 4000 clusters with similar data.
  
 This happens with both a standalone cluster, and in local multi-core mode 
 (with the node given 200GB of heap), but eventually completes in local 
 single-core mode.
  
 Data statistics:
 Rows: 166248
 Columns: 108
  
 This is a test run before trying it out on much larger data
  
 Any ideas on what might be the cause of this?
  
 Thanks,
 Dave



Re: processing s3n:// files in parallel

2014-04-28 Thread Matei Zaharia
Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do 
comma-separated lists (e.g. s3n://file1,s3n://file2). These are all inherited 
from FileInputFormat in Hadoop.

Matei

On Apr 28, 2014, at 6:05 PM, Andrew Ash and...@andrewash.com wrote:

 This is already possible with the sc.textFile(/path/to/filedir/*) call. 
 Does that work for you?
 
 Sent from my mobile phone
 
 On Apr 29, 2014 2:46 AM, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 It would be useful to have some way to open multiple files at once into a 
 single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be 
 equivalent to opening a single file which is made by concatenating the 
 various files together. This would only be useful, of course, if the source 
 files were all in the same format.
 
 Nick
 
 
 
 On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash and...@andrewash.com wrote:
 The way you've written it there, I would expect that to be serial runs.  The 
 reason is, you're looping over matches with a driver-level map, which is 
 serialized.  Then calling foreachWith on the RDDs executes the action in a 
 blocking way, so you don't get a result back until the cluster finishes.
 
 You can have multiple jobs running at the same time though by sharing a 
 SparkContext among threads.  Rather than run all the foreachWith()s in serial 
 on a single thread in the driver, try running each in its own thread.
 
 
 
 
 On Tue, Apr 29, 2014 at 1:35 AM, Art Peel found...@gmail.com wrote:
 
 I’m trying to process 3 S3 files in parallel, but they always get processed 
 serially.
 Am I going about this the wrong way?  Full details below.
 
 Regards,
 Art
 
 
 
 I’m trying to do the following on a Spark cluster with 3 slave nodes.
 
 Given a list of N URLS for files in S3 (with s3n:// urls),
 
 For each file:
   1. search for some items
 2. record the findings from step 1 in an external data store.
 
 I’d like to process the 3 URLs in parallel on 3 different slave nodes, but 
 instead, they are getting processed serially on one node.
 
 I’ve tried various versions of my code to no avail. I’m also creating the 
 SparkContext with spark.scheduler.mode, FAIR”.
 
 Have I made a fundamental mistake? 
 
 I’m running Spark 0.9.1 and my code looks roughly like this:
 
 def processFiles(sc: SparkContext) {
 
 val urls = List(s3n://foo/file.txt, s3n://bar/file.txt, 
 s3n://baz/file.txt)
 
 val hadoopFiles = urls.map(url = {
   sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], 
 classOf[WritableFooRecord])
 })
 
 val matches = hadoopFiles.par.map((hadoopFile) = {
 
   findMatches(hadoopFile)
 })
 
 
 matches.map((matchRDD) = {
   recordMatches(matchRDD)
 })
   }
 
 
   def findMatches(hadoopFile: RDD): RDD = {
 
 hadoopFile.map( record = caseClassResultFromSearch(record) )
 
   }
 
   def recordMatches(matchRDD: RDD) {
 
 
 matchRDD.foreachWith(_ = {
 
   makeRestClient(config)
 
 // I get 3 jobs referring to the line number of the next line, but the 
 jobs run serially on one node.
 })((matchRecord, client) = {
 
   client.storeResult(matchRecord)
 
 }
 
 
   }
 
 
 
 



Re: Python Spark on YARN

2014-04-29 Thread Matei Zaharia
This will be possible in 1.0 after this pull request: 
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote:

 Hi all:
 
 Is it possible to develop Spark programs in Python and run them on YARN? From 
 the Python SparkContext class, it doesn't seem to have such an option.
 
 Thank you,
 - Guanhua
 
 ===
 Guanhua Yan, Ph.D.
 Information Sciences Group (CCS-3)
 Los Alamos National Laboratory
 Tel: +1-505-667-0176
 Email: gh...@lanl.gov
 Web: http://ghyan.weebly.com/
 ===



Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Hi Diana,

Apart from these reasons, in a multi-stage job, Spark saves the map output 
files from map stages to the filesystem, so it only needs to rerun the last 
reduce stage. This is why you only saw one stage executing. These files are 
saved for fault recovery but they speed up subsequent runs.

Matei

On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask it 
 to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time as 
 a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips off 
 Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an action 
 when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)
 
 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 
 



Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Yes, this happens as long as you use the same RDD. For example say you do the 
following:

data1 = sc.textFile(…).map(…).reduceByKey(…)
data1.count()
data1.filter(…).count()

The first count() causes outputs of the map/reduce pair in there to be written 
out to shuffle files. Next time you do a count, on either this RDD or a child 
(e.g. after the filter), we notice that output files were already generated for 
this shuffle so we don’t rerun the map stage. Note that the output does get 
read again over the network, which is kind of wasteful (if you really wanted to 
reuse this as quickly as possible you’d use cache()).

Matei

On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 Hey Matei,
 Not sure i understand that. These are 2 separate jobs. So the second job 
 takes advantage of the fact that there is map output left somewhere on disk 
 from the first job, and re-uses that?
 
 
 On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hi Diana,
 
 Apart from these reasons, in a multi-stage job, Spark saves the map output 
 files from map stages to the filesystem, so it only needs to rerun the last 
 reduce stage. This is why you only saw one stage executing. These files are 
 saved for fault recovery but they speed up subsequent runs.
 
 Matei
 
 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask 
 it to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time 
 as a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips 
 off Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent 
 execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an 
 action when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map

Re: Spark GCE Script

2014-05-05 Thread Matei Zaharia
Very cool! Have you thought about sending this as a pull request? We’d be happy 
to maintain it inside Spark, though it might be interesting to find a single 
Python package that can manage clusters across both EC2 and GCE.

Matei

On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 Hi Sparkers,
 
 We have created a quick spark_gce script which can launch a spark cluster in 
 the Google Cloud. I'm sharing it because it might be helpful for someone 
 using the Google Cloud for deployment rather than AWS.
 
 Here's the link to the script
 
 https://github.com/sigmoidanalytics/spark_gce
 
 Feel free to use it and suggest any feedback around it.
 
 In short here's what it does:
 
 Just like the spark_ec2 script, this one also reads certain command-line 
 arguments (See the github page for more details) like the cluster name and 
 all, then starts the machines in the google cloud, sets up the network, adds 
 a 500GB empty disk to all machines, generate the ssh keys on master and 
 transfer it to all slaves and install java and downloads and configures 
 Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently 
 the version is 0.9.1 but I'm happy to add/support more versions if anyone is 
 interested.
 
 
 Cheers.
 
 
 Thanks
 Best Regards



Re: Increase Stack Size Workers

2014-05-06 Thread Matei Zaharia
Add export SPARK_JAVA_OPTS=“-Xss16m” to conf/spark-env.sh. Then it should apply 
to the executor.

Matei


On May 5, 2014, at 2:20 PM, Andrea Esposito and1...@gmail.com wrote:

 Hi there,
 
 i'm doing an iterative algorithm and sometimes i ended up with 
 StackOverflowError, doesn't matter if i do checkpoints or not.
 
 Remaining i don't understand why this is happening, i figure out that 
 increasing the stack size is a workaround.
 
 Developing using local[n] so the local mode i can set the stack size 
 through the -Xss parameter. How can i do the same for the standalone mode for 
 each worker? Setting it as java -Xss16m Worker seems useless because the 
 actual computation are done on CoarseGrainExecutor..
 
 Best,
 EA



Re: Spark and Java 8

2014-05-06 Thread Matei Zaharia
Java 8 support is a feature in Spark, but vendors need to decide for themselves 
when they’d like support Java 8 commercially. You can still run Spark on Java 7 
or 6 without taking advantage of the new features (indeed our builds are always 
against Java 6).

Matei

On May 6, 2014, at 8:59 AM, Ian O'Connell i...@ianoconnell.com wrote:

 I think the distinction there might be they never said they ran that code 
 under CDH5, just that spark supports it and spark runs under CDH5. Not that 
 you can use these features while running under CDH5.
 
 They could use mesos or the standalone scheduler to run them
 
 
 On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.com wrote:
 Hi
 
 I just read an article [1] about Spark, CDH5 and Java 8 but did not get 
 exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark using 
 a separate JVM that run on data nodes or is it reusing the YARN JVM runtime 
 somehow, like hadoop1?
 
 CDH5 only supports Java 7 [2] as far as I know?
 
 Cheers,
 -Kristoffer
 
 
 [1] 
 http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/
 [2] 
 http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html
 
 
 
 
 



Re: Spark to utilize HDFS's mmap caching

2014-05-12 Thread Matei Zaharia
Yes, Spark goes through the standard HDFS client and will automatically benefit 
from this.

Matei

On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote:

 Hi all,
 
 Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via
 sc.textFile() and other HDFS-related APIs?
 
 http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
 
 Best regards,
 
 -chanwit
 
 --
 Chanwit Kaewkasi
 linkedin.com/in/chanwit



Re: Is their a way to Create SparkContext object?

2014-05-12 Thread Matei Zaharia
You can just pass it around as a parameter.

On May 12, 2014, at 12:37 PM, yh18190 yh18...@gmail.com wrote:

 Hi,
 
 Could anyone suggest an idea how can we create sparkContext object in other
 classes or fucntions where we need to convert a scala collection to RDD
 using sc object.like sc.makeRDD(list).instead of using Main class
 sparkcontext object?
 is their  a way to pass sc object as a parameter to function in other
 classes?
 Please let me know
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: pySpark memory usage

2014-05-12 Thread Matei Zaharia
Hey Jim, unfortunately external spilling is not implemented in Python right 
now. While it would be possible to update combineByKey to do smarter stuff 
here, one simple workaround you can try is to launch more map tasks (or more 
reduce tasks). To set the minimum number of map tasks, you can pass it as a 
second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).

Matei

On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.
 
 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.
 
 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640
 
 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!
 
 
 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Hey Jim,
 
 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579
 
 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.
 
 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader knows
 what is going on.
 
 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...
 
 - Patrick
 
 
 
 
 
 
 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.
 
 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?
 
 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,
 
 Jim
 
 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process
 
 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning data
 // before we are done passing elements through, e.g., for
 take(). Just log a message to
 
 
 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 This dataset is uncompressed text at ~54GB. stats() returns (count:
 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
 343)
 
 On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
 Okay, thanks

Test

2014-05-15 Thread Matei Zaharia


Re: pySpark memory usage

2014-05-15 Thread Matei Zaharia
Cool, that’s good to hear. We’d also like to add spilling in Python itself, or 
at least make it exit with a good message if it can’t do it.

Matei

On May 14, 2014, at 10:47 AM, Jim Blomo jim.bl...@gmail.com wrote:

 That worked amazingly well, thank you Matei!  Numbers that worked for
 me were 400 for the textFile()s, 1500 for the join()s.
 
 On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).
 
 Matei
 
 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.
 
 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.
 
 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640
 
 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!
 
 
 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com 
 wrote:
 
 Hey Jim,
 
 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579
 
 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.
 
 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think 
 there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.
 
 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...
 
 - Patrick
 
 
 
 
 
 
 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.
 
 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?
 
 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,
 
 Jim
 
 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
readerException = e
Try(worker.shutdownOutput()) // kill Python worker process
 
 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
  case e: IOException =
// This can happen for legitimate reasons if the Python

Re: persist @ disk-only failing

2014-05-19 Thread Matei Zaharia
This is the patch for it: https://github.com/apache/spark/pull/50/. It might be 
possible to backport it to 0.8.

Matei

On May 19, 2014, at 2:04 AM, Sai Prasanna ansaiprasa...@gmail.com wrote:

 Matei, I am using 0.8.1 !!
 
 But is there a way without moving to 0.9.1 to bypass cache ?
 
 
 On Mon, May 19, 2014 at 1:31 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 What version is this with? We used to build each partition first before 
 writing it out, but this was fixed a while back (0.9.1, but it may also be in 
 0.9.0).
 
 Matei
 
 On May 19, 2014, at 12:41 AM, Sai Prasanna ansaiprasa...@gmail.com wrote:
 
  Hi all,
 
  When i gave the persist level as DISK_ONLY, still Spark tries to use memory 
  and caches.
  Any reason ?
  Do i need to override some parameter elsewhere ?
 
  Thanks !
 
 



Re: How to compile the examples directory?

2014-05-19 Thread Matei Zaharia
If you’d like to work on just this code for your own changes, it might be best 
to copy it to a separate project. Look at 
http://spark.apache.org/docs/latest/quick-start.html for how to set up a 
standalone job.

Matei

On May 19, 2014, at 4:53 AM, Hao Wang wh.s...@gmail.com wrote:

 Hi,
 
 I am running some examples of Spark on a cluster. Because I need to modify 
 some source code, I have to re-compile the whole Spark using `sbt/sbt 
 assembly`, which takes a long time.
 
 I have tried `mvn package` under the example directory, it failed because of 
 some dependencies problem. Any way to avoid to compile the whole Spark 
 project?
 
 
 Regards,
 Wang Hao(王灏)
 
 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com



Re: advice on maintaining a production spark cluster?

2014-05-19 Thread Matei Zaharia
Which version is this with? I haven’t seen standalone masters lose workers. Is 
there other stuff on the machines that’s killing them, or what errors do you 
see?

Matei

On May 16, 2014, at 9:53 AM, Josh Marcus jmar...@meetup.com wrote:

 Hey folks,
 
 I'm wondering what strategies other folks are using for maintaining and 
 monitoring the stability of stand-alone spark clusters.
 
 Our master very regularly loses workers, and they (as expected) never rejoin 
 the cluster.  This is the same behavior I've seen
 using akka cluster (if that's what spark is using in stand-alone mode) -- are 
 there configuration options we could be setting
 to make the cluster more robust?
 
 We have a custom script which monitors the number of workers (through the web 
 interface) and restarts the cluster when
 necessary, as well as resolving other issues we face (like spark shells left 
 open permanently claiming resources), and it
 works, but it's no where close to a great solution.
 
 What are other folks doing?  Is this something that other folks observe as 
 well?  I suspect that the loss of workers is tied to 
 jobs that run out of memory on the client side or our use of very large 
 broadcast variables, but I don't have an isolated test case.
 I'm open to general answers here: for example, perhaps we should simply be 
 using mesos or yarn instead of stand-alone mode.
 
 --j
 



Re: life if an executor

2014-05-19 Thread Matei Zaharia
They’re tied to the SparkContext (application) that launched them.

Matei

On May 19, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 from looking at the source code i see executors run in their own jvm 
 subprocesses.
 
 how long to they live for? as long as the worker/slave? or are they tied to 
 the sparkcontext and life/die with it?
 
 thx



Re: advice on maintaining a production spark cluster?

2014-05-20 Thread Matei Zaharia
Are you guys both using Cloudera Manager? Maybe there’s also an issue with the 
integration with that.

Matei

On May 20, 2014, at 11:44 AM, Aaron Davidson ilike...@gmail.com wrote:

 I'd just like to point out that, along with Matei, I have not seen workers 
 drop even under the most exotic job failures. We're running pretty close to 
 master, though; perhaps it is related to an uncaught exception in the Worker 
 from a prior version of Spark.
 
 
 On Tue, May 20, 2014 at 11:36 AM, Arun Ahuja aahuj...@gmail.com wrote:
 Hi Matei,
 
 Unfortunately, I don't have more detailed information, but we have seen the 
 loss of workers in standalone mode as well.  If a job is killed through 
 CTRL-C we will often see in the Spark Master page the number of workers and 
 cores decrease.  They are still alive and well in the Cloudera Manager page, 
 but not visible on the Spark master, simply restarting the workers usually 
 resolves this, but we often seen workers disappear after a failed or killed 
 job.
 
 If we see this occur again, I'll try and provide some logs.
 
 
 
 
 On Mon, May 19, 2014 at 10:51 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Which version is this with? I haven’t seen standalone masters lose workers. 
 Is there other stuff on the machines that’s killing them, or what errors do 
 you see?
 
 Matei
 
 On May 16, 2014, at 9:53 AM, Josh Marcus jmar...@meetup.com wrote:
 
  Hey folks,
 
  I'm wondering what strategies other folks are using for maintaining and 
  monitoring the stability of stand-alone spark clusters.
 
  Our master very regularly loses workers, and they (as expected) never 
  rejoin the cluster.  This is the same behavior I've seen
  using akka cluster (if that's what spark is using in stand-alone mode) -- 
  are there configuration options we could be setting
  to make the cluster more robust?
 
  We have a custom script which monitors the number of workers (through the 
  web interface) and restarts the cluster when
  necessary, as well as resolving other issues we face (like spark shells 
  left open permanently claiming resources), and it
  works, but it's no where close to a great solution.
 
  What are other folks doing?  Is this something that other folks observe as 
  well?  I suspect that the loss of workers is tied to
  jobs that run out of memory on the client side or our use of very large 
  broadcast variables, but I don't have an isolated test case.
  I'm open to general answers here: for example, perhaps we should simply be 
  using mesos or yarn instead of stand-alone mode.
 
  --j
 
 
 
 



Re: Python, Spark and HBase

2014-05-20 Thread Matei Zaharia
Unfortunately this is not yet possible. There’s a patch in progress posted here 
though: https://github.com/apache/spark/pull/455 — it would be great to get 
your feedback on it.

Matei

On May 20, 2014, at 4:21 PM, twizansk twiza...@gmail.com wrote:

 Hello,
 
 This seems like a basic question but I have been unable to find an answer in
 the archives or other online sources.
 
 I would like to know if there is any way to load a RDD from HBase in python. 
 In Java/Scala I can do this by initializing a NewAPIHadoopRDD with a
 TableInputFormat class.  Is there any equivalent in python?
 
 Thanks
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Python, Spark and HBase

2014-05-28 Thread Matei Zaharia
It sounds like you made a typo in the code — perhaps you’re trying to call 
self._jvm.PythonRDDnewAPIHadoopFile instead of  
self._jvm.PythonRDD.newAPIHadoopFile? There should be a dot before the new.


Matei

On May 28, 2014, at 5:25 PM, twizansk twiza...@gmail.com wrote:

 Hi Nick,
 
 I finally got around to downloading and building the patch.  
 
 I pulled the code from
 https://github.com/MLnick/spark-1/tree/pyspark-inputformats
 
 I am running on a CDH5 node.  While the code in the CDH branch is different
 from spark master, I do believe that I have resolved any inconsistencies.
 
 When attempting to connect to an HBase table using
 SparkContext.newAPIHadoopFile  I receive the following error:
 
Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile
 does not exist in the JVM
 
 I have searched the pyspark-inputformats branch and cannot find any
 reference to the class org.apache.spark.api.python.PythonRDDnewAPIHadoopFile
 
 Any ideas?
 
 Also, do you have a working example of HBase access with the new code?
 
 Thanks
 
 Tommer  
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6502.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Checking spark cache percentage programatically. And how to clear cache.

2014-05-28 Thread Matei Zaharia
You can remove cached RDDs by calling unpersist() on them.

You can also use SparkContext.getRDDStorageInfo to get info on cache usage, 
though this is a developer API so it may change in future versions. We will add 
a standard API eventually but this is just very closely tied to framework 
internals.

Matei

On May 28, 2014, at 5:32 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote:

 Hi,
 
 Is there a programmatic way of checking whether RDD has been 100% cached or 
 not? I'd like to do this to have two different path ways.
 
 Additionally, how do you clear cache (e.g. if you want to cache different 
 RDDs, and you'd like to clear an existing cached RDD).
 
 Thanks!



Re: Spark hook to create external process

2014-05-29 Thread Matei Zaharia
Hi Anand,

This is probably already handled by the RDD.pipe() operation. It will spawn a 
process and let you feed data to it through its stdin and read data through 
stdout.

Matei

On May 29, 2014, at 9:39 AM, ansriniv ansri...@gmail.com wrote:

 I have a requirement where for every Spark executor threadpool thread, I need
 to launch an associated external process.
 
 My job will consist of some processing in the Spark executor thread and some
 processing by its associated external process with the 2 communicating via
 some IPC mechanism.
 
 Is there a hook in Spark where I can put in my code to create / destroy
 these external processes corresponding to the creation / destruction of
 executor thread pool threads.
 
 Thanks
 Anand
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Driver OOM while using reduceByKey

2014-05-29 Thread Matei Zaharia
That hash map is just a list of where each task ran, it’s not the actual data. 
How many map and reduce tasks do you have? Maybe you need to give the driver a 
bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use 
only 100 tasks).

Matei

On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

 Hi,
 
  I used 1g memory for the driver java process and got OOM error on driver 
 side before reduceByKey. After analyzed the heap dump, the biggest object is 
 org.apache.spark.MapStatus, which occupied over 900MB memory. 
 
 Here's my question: 
 
 
 1. Is there any optimization switches that I can tune to avoid this? I have 
 used the compression on output with spark.io.compression.codec.  
 
 2. Why the workers send all the data back to driver to run reduceByKey? With 
 the current implementation, if I use reduceByKey on TBs of data, that will be 
 a disaster for driver. Maybe I'm wrong about the assumption of the spark 
 implementation.
 
 
 And here's my code snippet:
 
 
 ```
 
 val cntNew = spark.accumulator(0)
 
 val cntOld = spark.accumulator(0)
 
 val cntErr = spark.accumulator(0)
 
 
 val sequenceFileUrl = args(0)
 
 val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
 val stat = seq.map(pair = convertData(
 
   pair._2, cntNew, cntOld, cntErr
 
 )).reduceByKey(_ + _)
 
 stat.saveAsSequenceFile(args(1)
 
 ```
 
 
 Thanks.
 
 
 -- 
 
 haitao.yao@China



Re: Why Scala?

2014-05-29 Thread Matei Zaharia
Quite a few people ask this question and the answer is pretty simple. When we 
started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, 
which is JVM-based, and we wanted a concise programming interface similar to 
Microsoft’s DryadLINQ (the first language-integrated big data framework I know 
of, that begat things like FlumeJava and Crunch). On the JVM, the only language 
that would offer that kind of API was Scala, due to its ability to capture 
functions and ship them across the network. Scala’s static typing also made it 
much easier to control performance compared to, say, Jython or Groovy.

In terms of usage, however, we see substantial usage of our other languages 
(Java and Python), and we’re continuing to invest in both. In a user survey we 
did last fall, about 25% of users used Java and 30% used Python, and I imagine 
these numbers are growing. With lambda expressions now added to Java 8 
(http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll 
see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, 
which is very exciting to us in terms of ease of use.

Matei

On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote:

 HN is a cesspool safely ignored.
 
 
 On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com 
 wrote:
 I recently discovered Hacker News and started reading through older posts 
 about Scala. It looks like the language is fairly controversial on there, and 
 it got me thinking.
 
 Scala appears to be the preferred language to work with in Spark, and Spark 
 itself is written in Scala, right?
 
 I know that often times a successful project evolves gradually out of 
 something small, and that the choice of programming language may not always 
 have been made consciously at the outset.
 
 But pretending that it was, why is Scala the preferred language of Spark?
 
 Nick
 
 
 View this message in context: Why Scala?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: Shuffle file consolidation

2014-05-29 Thread Matei Zaharia
It can be set in an individual application.

Consolidation had some issues on ext3 as mentioned there, though we might 
enable it by default in the future because other optimizations now made it 
perform on par with the non-consolidation version. It also had some bugs in 
0.9.0 so I’d suggest at least 0.9.1.

Matei

On May 29, 2014, at 2:21 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com 
wrote:

 Thanks, I missed that.
 
 One thing that's still unclear to me, even looking at that, is - does this 
 parameter have to be set when starting up the cluster, on each of the 
 workers, or can it be set by an individual client job?
 
 
 On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote:
 Hi Nathan,
 
 There's some explanation in the spark configuration section:
 
 ```
 If set to true, consolidates intermediate files created during a shuffle. 
 Creating fewer files can improve filesystem performance for shuffles with 
 large numbers of reduce tasks. It is recommended to set this to true when 
 using ext4 or xfs filesystems. On ext3, this option might degrade performance 
 on machines with many (8) cores due to filesystem limitations.
 ```
 
 
 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com:
 
 In trying to sort some largish datasets, we came across the 
 spark.shuffle.consolidateFiles property, and I found in the source code that 
 it is set, by default, to false, with a note to default it to true when the 
 feature is stable.
 
 Does anyone know what is unstable about this? If we set it true, what 
 problems should we anticipate?
 
 Thanks,
 -Nathan Kronenfeld
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com
 
 
 
 -- 
 JU Han
 
 Data Engineer @ Botify.com
 
 +33 061960
 
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Re: Trouble with EC2

2014-05-31 Thread Matei Zaharia
What instance types did you launch on?

Sometimes you also get a bad individual machine from EC2. It might help to 
remove the node it’s complaining about from the conf/slaves file.

Matei

On May 30, 2014, at 11:18 AM, PJ$ p...@chickenandwaffl.es wrote:

 Hey Folks, 
 
 I'm really having quite a bit of trouble getting spark running on ec2. I'm 
 not using scripts the https://github.com/apache/spark/tree/master/ec2 because 
 I'd like to know how everything works. But I'm going a little crazy. I think 
 that something about the networking configuration must be messed up, but I'm 
 at a loss. Shortly after starting the cluster, I get a lot of this: 
 
 14/05/30 18:03:22 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:22 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:23 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:23 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO actor.LocalActorRef: Message 
 [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
 Actor[akka://sparkMaster/deadLetters] to 
 Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.100.75.70%3A36725-25#847210246]
  was not delivered. [5] dead letters encountered. This logging can be turned 
 off or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
 ]
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
 ]
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485



Re: Yay for 1.0.0! EC2 Still has problems.

2014-06-01 Thread Matei Zaharia
More specifically with the -a flag, you *can* set your own AMI, but you’ll need 
to base it off ours. This is because spark-ec2 assumes that some packages (e.g. 
java, Python 2.6) are already available on the AMI.

Matei

On Jun 1, 2014, at 11:01 AM, Patrick Wendell pwend...@gmail.com wrote:

 Hey just to clarify this - my understanding is that the poster
 (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally
 launch spark-ec2 from my laptop. And he was looking for an AMI that
 had a high enough version of python.
 
 Spark-ec2 itself has a flag -a that allows you to give a specific
 AMI. This flag is just an internal tool that we use for testing when
 we spin new AMI's. Users can't set that to an arbitrary AMI because we
 tightly control things like the Java and OS versions, libraries, etc.
 
 
 On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee
 unorthodox.engine...@gmail.com wrote:
 *sigh* OK, I figured it out. (Thank you Nick, for the hint)
 
 m1.large works, (I swear I tested that earlier and had similar issues... )
 
 It was my obsession with starting r3.*large instances. Clearly I hadn't
 patched the script in all the places.. which I think caused it to default to
 the Amazon AMI. I'll have to take a closer look at the code and see if I
 can't fix it correctly, because I really, really do want nodes with 2x the
 CPU and 4x the memory for the same low spot price. :-)
 
 I've got a cluster up now, at least. Time for the fun stuff...
 
 Thanks everyone for the help!
 
 
 
 On Sun, Jun 1, 2014 at 5:19 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 
 If you are explicitly specifying the AMI in your invocation of spark-ec2,
 may I suggest simply removing any explicit mention of AMI from your
 invocation? spark-ec2 automatically selects an appropriate AMI based on the
 specified instance type.
 
 2014년 6월 1일 일요일, Nicholas Chammasnicholas.cham...@gmail.com님이 작성한 메시지:
 
 Could you post how exactly you are invoking spark-ec2? And are you having
 trouble just with r3 instances, or with any instance type?
 
 2014년 6월 1일 일요일, Jeremy Leeunorthodox.engine...@gmail.com님이 작성한 메시지:
 
 It's been another day of spinning up dead clusters...
 
 I thought I'd finally worked out what everyone else knew - don't use the
 default AMI - but I've now run through all of the official quick-start
 linux releases and I'm none the wiser:
 
 Amazon Linux AMI 2014.03.1 - ami-7aba833f (64-bit)
 Provisions servers, connects, installs, but the webserver on the master
 will not start
 
 Red Hat Enterprise Linux 6.5 (HVM) - ami-5cdce419
 Spot instance requests are not supported for this AMI.
 
 SuSE Linux Enterprise Server 11 sp3 (HVM) - ami-1a88bb5f
 Not tested - costs 10x more for spot instances, not economically viable.
 
 Ubuntu Server 14.04 LTS (HVM) - ami-f64f77b3
 Provisions servers, but git is not pre-installed, so the cluster setup
 fails.
 
 Amazon Linux AMI (HVM) 2014.03.1 - ami-5aba831f
 Provisions servers, but git is not pre-installed, so the cluster setup
 fails.
 
 
 
 
 --
 Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers



Re: Trouble with EC2

2014-06-01 Thread Matei Zaharia
So to run spark-ec2, you should use the default AMI that it launches with if 
you don’t pass -a. Those are based on Amazon Linux, not Debian. Passing your 
own AMI is an advanced option but people need to install some stuff on their 
AMI in advance for it to work with our scripts.

Matei


On Jun 1, 2014, at 3:11 PM, PJ$ p...@chickenandwaffl.es wrote:

 Running on a few m3.larges with the ami-848a6eec image (debian 7). Haven't 
 gotten any further. No clue what's wrong. I'd really appreciate any guidance 
 y'all could offer. 
 
 Best, 
 PJ$
 
 
 On Sat, May 31, 2014 at 1:40 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 What instance types did you launch on?
 
 Sometimes you also get a bad individual machine from EC2. It might help to 
 remove the node it’s complaining about from the conf/slaves file.
 
 Matei
 
 On May 30, 2014, at 11:18 AM, PJ$ p...@chickenandwaffl.es wrote:
 
 Hey Folks, 
 
 I'm really having quite a bit of trouble getting spark running on ec2. I'm 
 not using scripts the https://github.com/apache/spark/tree/master/ec2 
 because I'd like to know how everything works. But I'm going a little crazy. 
 I think that something about the networking configuration must be messed up, 
 but I'm at a loss. Shortly after starting the cluster, I get a lot of this: 
 
 14/05/30 18:03:22 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:22 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:23 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:03:23 INFO master.Master: Registering worker 
 ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO actor.LocalActorRef: Message 
 [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
 Actor[akka://sparkMaster/deadLetters] to 
 Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.100.75.70%3A36725-25#847210246]
  was not delivered. [5] dead letters encountered. This logging can be turned 
 off or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
 ]
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
 ]
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 INFO master.Master: 
 akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
 removing it.
 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
 [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
 failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
 akka.remote.EndpointAssociationException: Association failed with 
 [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
 Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
 
 



Re: Yay for 1.0.0! EC2 Still has problems.

2014-06-01 Thread Matei Zaharia
FYI, I opened https://issues.apache.org/jira/browse/SPARK-1990 to track this.

Matei

On Jun 1, 2014, at 6:14 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote:

 Sort of.. there were two separate issues, but both related to AWS..
 
 I've sorted the confusion about the Master/Worker AMI ... use the version 
 chosen by the scripts. (and use the right instance type so the script can 
 choose wisely)
 
 But yes, one also needs a launch machine to kick off the cluster, and for 
 that I _also_ was using an Amazon instance... (made sense.. I have a team 
 that will needs to do things as well, not just me) and I was just pointing 
 out that if you use the most recommended by Amazon AMI (for your free micro 
 instance, for example) you get python 2.6 and the ec2 scripts fail.
 
 That merely needs a line in the documentation saying use Ubuntu for your 
 cluster controller, not Amazon Linux or somesuch. But yeah, for a newbie, it 
 was hard working out when to use default or custom AMIs for various parts 
 of the setup.
 
 
 On Mon, Jun 2, 2014 at 4:01 AM, Patrick Wendell pwend...@gmail.com wrote:
 Hey just to clarify this - my understanding is that the poster
 (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally
 launch spark-ec2 from my laptop. And he was looking for an AMI that
 had a high enough version of python.
 
 Spark-ec2 itself has a flag -a that allows you to give a specific
 AMI. This flag is just an internal tool that we use for testing when
 we spin new AMI's. Users can't set that to an arbitrary AMI because we
 tightly control things like the Java and OS versions, libraries, etc.
 
 
 On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee
 unorthodox.engine...@gmail.com wrote:
  *sigh* OK, I figured it out. (Thank you Nick, for the hint)
 
  m1.large works, (I swear I tested that earlier and had similar issues... )
 
  It was my obsession with starting r3.*large instances. Clearly I hadn't
  patched the script in all the places.. which I think caused it to default to
  the Amazon AMI. I'll have to take a closer look at the code and see if I
  can't fix it correctly, because I really, really do want nodes with 2x the
  CPU and 4x the memory for the same low spot price. :-)
 
  I've got a cluster up now, at least. Time for the fun stuff...
 
  Thanks everyone for the help!
 
 
 
  On Sun, Jun 1, 2014 at 5:19 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
 
  If you are explicitly specifying the AMI in your invocation of spark-ec2,
  may I suggest simply removing any explicit mention of AMI from your
  invocation? spark-ec2 automatically selects an appropriate AMI based on the
  specified instance type.
 
  2014년 6월 1일 일요일, Nicholas Chammasnicholas.cham...@gmail.com님이 작성한 메시지:
 
  Could you post how exactly you are invoking spark-ec2? And are you having
  trouble just with r3 instances, or with any instance type?
 
  2014년 6월 1일 일요일, Jeremy Leeunorthodox.engine...@gmail.com님이 작성한 메시지:
 
  It's been another day of spinning up dead clusters...
 
  I thought I'd finally worked out what everyone else knew - don't use the
  default AMI - but I've now run through all of the official quick-start
  linux releases and I'm none the wiser:
 
  Amazon Linux AMI 2014.03.1 - ami-7aba833f (64-bit)
  Provisions servers, connects, installs, but the webserver on the master
  will not start
 
  Red Hat Enterprise Linux 6.5 (HVM) - ami-5cdce419
  Spot instance requests are not supported for this AMI.
 
  SuSE Linux Enterprise Server 11 sp3 (HVM) - ami-1a88bb5f
  Not tested - costs 10x more for spot instances, not economically viable.
 
  Ubuntu Server 14.04 LTS (HVM) - ami-f64f77b3
  Provisions servers, but git is not pre-installed, so the cluster setup
  fails.
 
  Amazon Linux AMI (HVM) 2014.03.1 - ami-5aba831f
  Provisions servers, but git is not pre-installed, so the cluster setup
  fails.
 
 
 
 
  --
  Jeremy Lee  BCompSci(Hons)
The Unorthodox Engineers
 
 
 
 -- 
 Jeremy Lee  BCompSci(Hons)
   The Unorthodox Engineers



Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Matei Zaharia
You can just use the Maven build for now, even for Spark 1.0.0.

Matei

On Jun 2, 2014, at 5:30 PM, Mohit Nayak wiza...@gmail.com wrote:

 Hey,
 Yup that fixed it. Thanks so much!
  
 Is this the only solution, or could this be resolved in future versions of 
 Spark ?
 
 
 On Mon, Jun 2, 2014 at 5:14 PM, Sean Owen so...@cloudera.com wrote:
 If it's the SBT build, I suspect you are hitting
 https://issues.apache.org/jira/browse/SPARK-1949
 
 Can you try to apply the excludes you see at
 https://github.com/apache/spark/pull/906/files to your build to see if
 it resolves it?
 
 If so I think this could be helpful to commit.
 
 On Tue, Jun 3, 2014 at 1:01 AM, Mohit Nayak wiza...@gmail.com wrote:
  Hey,
  Thanks for the reply.
 
  I am using SBT. Here is a list of my dependancies:
  val sparkCore= org.apache.spark % spark-core_2.10 % V.spark
  val hadoopCore   = org.apache.hadoop % hadoop-core   %
  V.hadoop% provided
  val jodaTime = com.github.nscala-time %% nscala-time %
  0.8.0
  val scalaUtil= com.twitter   %% util-collection  %
  V.util
  val logback  = ch.qos.logback % logback-classic % 1.0.6 %
  runtime
  var openCsv  = net.sf.opencsv % opencsv % 2.1
  var scalaTest= org.scalatest % scalatest_2.10 % 2.1.0 % test
  var scalaIOCore  = com.github.scala-incubator.io %% scala-io-core %
  V.scalaIO
  var scalaIOFile  = com.github.scala-incubator.io %% scala-io-file %
  V.scalaIO
  var kryo = com.esotericsoftware.kryo % kryo % 2.16
  var spray = io.spray %%  spray-json % 1.2.5
  var scala_reflect = org.scala-lang % scala-reflect % 2.10.3
 
 
 
  On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen so...@cloudera.com wrote:
 
  This ultimately means you have a couple copies of the servlet APIs in
  the build. What is your build like (SBT? Maven?) and what exactly are
  you depending on?
 
  On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak wiza...@gmail.com wrote:
   Hi,
   I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw
   a
  
   java.lang.SecurityException: class javax.servlet.FilterRegistration's
   signer information does not match signer information of other classes in
   the
   same package
  
  
   I'm using Hadoop-core 1.0.4 and running this locally.
   I noticed that there was an issue regarding this and was marked as
   resolved
   [https://issues.apache.org/jira/browse/SPARK-1693]
   Please guide..
  
   --
   -Mohit
   wiza...@gmail.com
  
  
  
   --
   -Mohit
   wiza...@gmail.com
 
 
 
 
  --
  -Mohit
  wiza...@gmail.com
 
 
 
 -- 
 -Mohit
 wiza...@gmail.com



Re: wholeTextFiles() : java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected

2014-06-03 Thread Matei Zaharia
Yeah unfortunately Hadoop 2 requires these binaries on Windows. Hadoop 1 runs 
just fine without them.

Matei

On Jun 3, 2014, at 10:33 AM, Sean Owen so...@cloudera.com wrote:

 I'd try the internet / SO first -- these are actually generic
 Hadoop-related issues. Here I think you don't have HADOOP_HOME or
 similar set.
 
 http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
 
 On Tue, Jun 3, 2014 at 5:54 PM, toivoa toivo@gmail.com wrote:
 Wow! What a quick reply!
 
 adding
 
dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
version2.4.0/version
/dependency
 
 solved the problem.
 
 But now I get
 
 14/06/03 19:52:50 ERROR Shell: Failed to locate the winutils binary in the
 hadoop binary path
 java.io.IOException: Could not locate executable null\bin\winutils.exe in
 the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.clinit(Shell.java:326)
at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
at 
 org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.init(Groups.java:77)
at
 org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at
 org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at
 org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
at 
 org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36)
at
 org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109)
at 
 org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala)
 
 thanks
 toivo
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-java-lang-IncompatibleClassChangeError-Found-class-org-apache-hadoop-mapreduce-TaskAtd-tp6818p6820.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Better line number hints for logging?

2014-06-03 Thread Matei Zaharia
You can use RDD.setName to give it a name. There’s also a creationSite field 
that is private[spark] — we may want to add a public setter for that later. If 
the name isn’t enough and you’d like this, please open a JIRA issue for it.

Matei

On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote:

 I have created some extension methods for RDDs in RichRecordRDD and these are 
 working exceptionally well for me. 
 
 However, when looking at the logs, its impossible to tell what's going on 
 because all the line number hints point to RichRecordRDD.scala rather than 
 the code that uses it. For example:
 
 INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at 
 RichRecordRDD.scala:633), which is now runnable
 Is there any way set up my extension methods class so that the logs will 
 print a more useful line number? 
 



Re: Invalid Class Exception

2014-06-03 Thread Matei Zaharia
What Java version do you have, and how did you get Spark (did you build it 
yourself by any chance or download a pre-built one)? If you build Spark 
yourself you need to do it with Java 6 — it’s a known issue because of the way 
Java 6 and 7 package JAR files. But I haven’t seen it result in this particular 
error.

Matei

On Jun 3, 2014, at 5:18 PM, Suman Somasundar suman.somasun...@oracle.com 
wrote:

 
 Hi all,
 
 I get the following exception when using Spark to run example k-means 
 program.  I am using Spark 1.0.0 and running the program locally.
 
 java.io.InvalidClassException: scala.Tuple2; invalid descriptor for field _1
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:697)
at 
 java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:827)
at 
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1583)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
 org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:87)
at 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:101)
at 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:100)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
 Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.init(ObjectStreamField.java:119)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:695)
... 26 more
 
 Anyone know why this is happening?
 
 Thanks,
 Suman.



Re: Yay for 1.0.0! EC2 Still has problems.

2014-06-03 Thread Matei Zaharia
Ah, sorry to hear you had more problems. Some thoughts on them:

 Thanks for that, Matei! I'll look at that once I get a spare moment. :-)
 
 If you like, I'll keep documenting my newbie problems and frustrations... 
 perhaps it might make things easier for others.
 
 Another issue I seem to have found (now that I can get small clusters up): 
 some of the examples (the streaming.Twitter ones especially) depend on there 
 being a /mnt/spark and /mnt2/spark directory (I think for java 
 tempfiles?) and those don't seem to exist out-of-the-box. I have to create 
 those directories and use copy-dir to get them to the workers before those 
 examples run.

I think this is a side-effect of the r3 instances not having those drives 
mounted. Our setup script would normally create these directories. What was the 
error?

 Much of the the last two days for me have been about failing to get any of my 
 own code to work, except for in spark-shell. (which is very nice, btw)
 
 At first I tried editing the examples, because I took the documentation 
 literally when it said Finally, Spark includes several samples in the 
 examples directory (Scala, Java, Python). You can run them as follows:  but 
 of course didn't realize editing them is pointless because while the source 
 is there, the code is actually pulled from a .jar elsewhere. Doh. (so obvious 
 in hindsight)
 
 I couldn't even turn down the voluminous INFO messages to WARNs, no matter 
 how many conf/log4j.properties files I edited or copy-dir'd. I'm sure there's 
 a trick to that I'm not getting. 

What did you change log4j.properties to? It should be changed to say 
log4j.rootCategory=WARN, console but maybe another log4j.properties is somehow 
arriving on the classpath. This is definitely a common problem so we need to 
add some explicit docs on it.

 Even trying to build SimpleApp I've run into the problem that all the 
 documentation says to use sbt/sbt assemble, but sbt doesn't seem to be in 
 the 1.0.0 pre-built packages that I downloaded.

Are you going through http://spark.apache.org/docs/latest/quick-start.html? You 
should be able to do just sbt package. Once you do that you don’t need to 
deploy your application’s JAR to the cluster, just pass it to spark-submit and 
it will automatically be sent over.

 Ah... yes.. there it is in the source package. I suppose that means that in 
 order to deploy any new code to the cluster, I've got to rebuild from source 
 on my cluster controller. OK, I never liked that Amazon Linux AMI anyway. 
 I'm going to start from scratch again with an Ubuntu 12.04 instance, 
 hopefully that will be more auspicious...
 
 Meanwhile I'm learning scala... Great Turing's Ghost, it's the dream language 
 we've theorized about for years! I hadn't realized!

Indeed, glad you’re enjoying it.

Matei

 
 
 
 On Mon, Jun 2, 2014 at 12:05 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 FYI, I opened https://issues.apache.org/jira/browse/SPARK-1990 to track this. 
 Matei
 
 
 On Jun 1, 2014, at 6:14 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote:
 
 Sort of.. there were two separate issues, but both related to AWS..
 
 I've sorted the confusion about the Master/Worker AMI ... use the version 
 chosen by the scripts. (and use the right instance type so the script can 
 choose wisely)
 
 But yes, one also needs a launch machine to kick off the cluster, and for 
 that I _also_ was using an Amazon instance... (made sense.. I have a team 
 that will needs to do things as well, not just me) and I was just pointing 
 out that if you use the most recommended by Amazon AMI (for your free 
 micro instance, for example) you get python 2.6 and the ec2 scripts fail.
 
 That merely needs a line in the documentation saying use Ubuntu for your 
 cluster controller, not Amazon Linux or somesuch. But yeah, for a newbie, 
 it was hard working out when to use default or custom AMIs for various 
 parts of the setup.
 
 
 On Mon, Jun 2, 2014 at 4:01 AM, Patrick Wendell pwend...@gmail.com wrote:
 Hey just to clarify this - my understanding is that the poster
 (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally
 launch spark-ec2 from my laptop. And he was looking for an AMI that
 had a high enough version of python.
 
 Spark-ec2 itself has a flag -a that allows you to give a specific
 AMI. This flag is just an internal tool that we use for testing when
 we spin new AMI's. Users can't set that to an arbitrary AMI because we
 tightly control things like the Java and OS versions, libraries, etc.
 
 
 On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee
 unorthodox.engine...@gmail.com wrote:
  *sigh* OK, I figured it out. (Thank you Nick, for the hint)
 
  m1.large works, (I swear I tested that earlier and had similar issues... 
  )
 
  It was my obsession with starting r3.*large instances. Clearly I hadn't
  patched the script in all the places.. which I think caused it to default 
  to
  the Amazon AMI. I'll have to take a closer look at the code

Re: Upgradation to Spark 1.0.0

2014-06-03 Thread Matei Zaharia
You can copy your configuration from the old one. I’d suggest just downloading 
it to a different location on each node first for testing, then you can delete 
the old one if things work.


On Jun 3, 2014, at 12:38 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote:

 Hi ,
 
 I am currently using SPARK 0.9 configured with the Hadoop 1.2.1 cluster.What 
 should I do if I want to upgrade it to spark 1.0.0?Do I need to download the 
 latest version and replace the existing spark with new one and make the 
 configuration changes again from the scratch or is there any other way to 
 move ahead?
 
 Thanks,
 Meethu M
 
 
 



Re: Join : Giving incorrect result

2014-06-04 Thread Matei Zaharia
If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei

On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 Maybe your two workers have different assembly jar files?
 
 I just ran into a similar problem that my spark-shell is using a different 
 jar file than my workers - got really confusing results.
 
 On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:
 Hi,
 
 I am doing join of two RDDs which giving different results ( counting number 
 of records ) each time I run this code on same input.
 
 The input files are large enough to be divided in two splits. When the 
 program runs on two workers with single core assigned to these, output is 
 consistent and looks correct. But when single worker is used with two or more 
 than two cores, the result seems to be random. Every time, count of joined 
 record is different.
 
 Does this sound like a defect or I need to take care of something while using 
 join ? I am using spark-0.9.1.
 
 Regards
 Ajay



Re: reuse hadoop code in Spark

2014-06-04 Thread Matei Zaharia
Yes, you can write some glue in Spark to call these. Some functions to look at:

- SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf 
configured by Hadoop (including InputFormat, paths, etc)
- RDD.mapPartitions lets you operate in all the values on one partition (block) 
at a time, similar to how Mappers in MapReduce work
- PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation.
- RDD.pipe() can be used to call out to a script or binary, like Hadoop 
Streaming.

A fair number of people have been running both Java and Hadoop Streaming apps 
like this.

Matei

On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote:

 Hello, 
 
   I am trying to use spark in such a scenario: 
 
   I have code written in Hadoop and now I try to migrate to Spark. The 
 mappers and reducers are fairly complex. So I wonder if I can reuse the map() 
 functions I already wrote in Hadoop (Java), and use Spark to chain them, 
 mixing the Java map() functions with Spark operators? 
 
   Another related question, can I use binary as operators, like Hadoop 
 streaming? 
 
   Thanks! 
 Wei 
 
  



Re: Better line number hints for logging?

2014-06-04 Thread Matei Zaharia
That’s a good idea too, maybe we can change CallSiteInfo to do that.

Matei

On Jun 4, 2014, at 8:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com 
wrote:

 Oh, this would be super useful for us too!
 
 Actually wouldn't it be best if you could see the whole call stack on the UI, 
 rather than just one line? (Of course you would have to click to expand it.)
 
 
 On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote:
 Ok, I will probably open a Jira.
 
 
 On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 You can use RDD.setName to give it a name. There’s also a creationSite field 
 that is private[spark] — we may want to add a public setter for that later. 
 If the name isn’t enough and you’d like this, please open a JIRA issue for it.
 
 Matei
 
 On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote:
 
 I have created some extension methods for RDDs in RichRecordRDD and these 
 are working exceptionally well for me. 
 
 However, when looking at the logs, its impossible to tell what's going on 
 because all the line number hints point to RichRecordRDD.scala rather than 
 the code that uses it. For example:
 
 INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at 
 RichRecordRDD.scala:633), which is now runnable
 Is there any way set up my extension methods class so that the logs will 
 print a more useful line number? 
 
 
 
 



Re: pyspark join crash

2014-06-04 Thread Matei Zaharia
In PySpark, the data processed by each reduce task needs to fit in memory 
within the Python process, so you should use more tasks to process this 
dataset. Data is spilled to disk across tasks.

I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — 
it’s something we’ve been meaning to look at soon.

Matei

On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote:

 Hi All,
 
 I have experienced some crashing behavior with join in pyspark.  When I 
 attempt a join with 2000 partitions in the result, the join succeeds, but 
 when I use only 200 partitions in the result, the join fails with the message 
 Job aborted due to stage failure: Master removed our application: FAILED.
 
 The crash always occurs at the beginning of the shuffle phase.  Based on my 
 observations, it seems like the workers in the read phase may be fetching 
 entire blocks from the write phase of the shuffle rather than just the 
 records necessary to compose the partition the reader is responsible for.  
 Hence, when there are fewer partitions in the read phase, the worker is 
 likely to need a record from each of the write partitions and consequently 
 attempts to load the entire data set into the memory of a single machine 
 (which then causes the out of memory crash I observe in /var/log/syslog).
 
 Can anybody confirm if this is the behavior of pyspark?  I am glad to supply 
 additional details about my observed behavior upon request.
 
 best,
 -Brad



Re: How can I dispose an Accumulator?

2014-06-04 Thread Matei Zaharia
All of these are disposed of automatically if you stop the context or exit the 
program.

Matei


On Jun 4, 2014, at 2:22 PM, Daniel Siegmann daniel.siegm...@velos.io wrote:

 Will the broadcast variables be disposed automatically if the context is 
 stopped, or do I still need to unpersist()?
 
 
 On Sat, May 31, 2014 at 1:20 PM, Patrick Wendell pwend...@gmail.com wrote:
 Hey There,
 
 You can remove an accumulator by just letting it go out of scope and
 it will be garbage collected. For broadcast variables we actually
 store extra information for it, so we provide hooks for users to
 remove the associated state. There is no such need for accumulators,
 though.
 
 - Patrick
 
 On Thu, May 29, 2014 at 2:13 AM, innowireless TaeYun Kim
 taeyun@innowireless.co.kr wrote:
  Hi,
 
 
 
  How can I dispose an Accumulator?
 
  It has no method like 'unpersist()' which Broadcast provides.
 
 
 
  Thanks.
 
 
 
 
 
 -- 
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning
 
 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io



Re: pyspark join crash

2014-06-04 Thread Matei Zaharia
I think the problem is that once unpacked in Python, the objects take 
considerably more space, as they are stored as Python objects in a Python 
dictionary. Take a look at python/pyspark/join.py and combineByKey in 
python/pyspark/rdd.py. We should probably try to store these in serialized form.

I’m not sure whether there’s a great way to inspect a Python process’s memory, 
but looking at what consumes memory in a reducer process would be useful.

Matei 


On Jun 4, 2014, at 2:34 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:

 Hi Matei,
 
 Thanks for the reply and creating the JIRA. I hear what you're saying, 
 although to be clear I want to still state that it seems like each reduce 
 task is loading significantly more data than just the records needed for that 
 task.  The workers seem to load all data from each block containing a record 
 needed by the reduce task.
 
 I base this hypothesis on the following:
 -My dataset is about 100G uncompressed, 22G serialized in memory with 
 compression enabled
 -There are 130K records
 -The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
 -There are 3 cores per node (each running one reduce task at a time)
 -Each node has 32G of memory
 
 Note that I am attempting to join the dataset to itself and I ran this 
 experiment after caching the dataset in memory with serialization and 
 compression enabled.
 
 Given these figures, even with only 200 partitions the average output 
 partition size (uncompressed) would be 1G (as the dataset is being joined to 
 itself, resulting in 200G over 200 partitions), requiring 3G from each 
 machine on average.  The behavior I observe is that the kernel kills jobs in 
 many of the nodes at nearly the exact same time right after the read phase 
 starts; it seems likely this would occur in each node except the master 
 begins detecting failures and stops the job (and I observe memory spiking on 
 all machines).  Indeed, I observe a large memory spike at each node.
 
 When I attempt the join with 2000 output partitions, it succeeds.  Note that 
 there are about 65 records per output partition on average, which means the 
 reader only needs to load input from about 130 blocks (as the dataset is 
 joined to itself).  Given that the average uncompressed block size is 60M, 
 even if the entire block were loaded (not just the relevant record) we would 
 expect about 23G of memory to be used per node on average.
 
 I began suspecting the behavior of loading entire blocks based on the logging 
 from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: 
 Getting 122 non-empty blocks out of 3354 blocks).  If it is definitely not 
 the case that entire blocks are loaded from the writers, then it would seem 
 like there is some significant overhead which is chewing threw lots of memory 
 (perhaps similar to the problem with python broadcast variables chewing 
 through memory https://spark-project.atlassian.net/browse/SPARK-1065).
 
 -Brad
 
 
 
 On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 In PySpark, the data processed by each reduce task needs to fit in memory 
 within the Python process, so you should use more tasks to process this 
 dataset. Data is spilled to disk across tasks.
 
 I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — 
 it’s something we’ve been meaning to look at soon.
 
 Matei
 
 On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 
  Hi All,
 
  I have experienced some crashing behavior with join in pyspark.  When I 
  attempt a join with 2000 partitions in the result, the join succeeds, but 
  when I use only 200 partitions in the result, the join fails with the 
  message Job aborted due to stage failure: Master removed our application: 
  FAILED.
 
  The crash always occurs at the beginning of the shuffle phase.  Based on my 
  observations, it seems like the workers in the read phase may be fetching 
  entire blocks from the write phase of the shuffle rather than just the 
  records necessary to compose the partition the reader is responsible for.  
  Hence, when there are fewer partitions in the read phase, the worker is 
  likely to need a record from each of the write partitions and consequently 
  attempts to load the entire data set into the memory of a single machine 
  (which then causes the out of memory crash I observe in /var/log/syslog).
 
  Can anybody confirm if this is the behavior of pyspark?  I am glad to 
  supply additional details about my observed behavior upon request.
 
  best,
  -Brad
 
 



Re: Why Scala?

2014-06-04 Thread Matei Zaharia
We are definitely investigating a Python API for Streaming, but no announced 
deadline at this point.

Matei

On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote:

 So Python is used in many of the Spark Ecosystem products, but not Streaming 
 at this point. Is there a roadmap to include Python APIs in Spark Streaming? 
 Anytime frame on this? 
 
 Thanks!
 
 John
 
 
 On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Quite a few people ask this question and the answer is pretty simple. When we 
 started Spark, we had two goals — we wanted to work with the Hadoop 
 ecosystem, which is JVM-based, and we wanted a concise programming interface 
 similar to Microsoft’s DryadLINQ (the first language-integrated big data 
 framework I know of, that begat things like FlumeJava and Crunch). On the 
 JVM, the only language that would offer that kind of API was Scala, due to 
 its ability to capture functions and ship them across the network. Scala’s 
 static typing also made it much easier to control performance compared to, 
 say, Jython or Groovy.
 
 In terms of usage, however, we see substantial usage of our other languages 
 (Java and Python), and we’re continuing to invest in both. In a user survey 
 we did last fall, about 25% of users used Java and 30% used Python, and I 
 imagine these numbers are growing. With lambda expressions now added to Java 
 8 (http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think 
 we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in 
 Python, which is very exciting to us in terms of ease of use.
 
 Matei
 
 On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote:
 
 HN is a cesspool safely ignored.
 
 
 On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com 
 wrote:
 I recently discovered Hacker News and started reading through older posts 
 about Scala. It looks like the language is fairly controversial on there, 
 and it got me thinking.
 
 Scala appears to be the preferred language to work with in Spark, and Spark 
 itself is written in Scala, right?
 
 I know that often times a successful project evolves gradually out of 
 something small, and that the choice of programming language may not always 
 have been made consciously at the outset.
 
 But pretending that it was, why is Scala the preferred language of Spark?
 
 Nick
 
 
 View this message in context: Why Scala?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 
 



  1   2   3   >