Re: Size of RDD larger than Size of data on disk
The problem is that Java objects can take more space than the underlying data, but there are options in Spark to store data in serialized form to get around this. Take a look at https://spark.incubator.apache.org/docs/latest/tuning.html. Matei On Feb 25, 2014, at 12:01 PM, Suraj Satishkumar Sheth suraj...@adobe.com wrote: Hi Mayur, Thanks for replying. Is it usually double the size of data on disk? I have observed this many times. Storage section of Spark is telling me that 100% of RDD is cached using 97 GB of RAM while the data in HDFS is only 47 GB. Thanks and Regards, Suraj Sheth From: Mayur Rustagi [mailto:mayur.rust...@gmail.com] Sent: Tuesday, February 25, 2014 11:19 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: Size of RDD larger than Size of data on disk Spark may take more RAM than reqiured by RDD, can you look at storage section of Spark see how much space RDD is taking in memory. It may still take more storage than disk as Java objects have some overhead. Consider enabling compression in RDD. Mayur Rustagi Ph: +919632149971 http://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Tue, Feb 25, 2014 at 6:47 AM, Suraj Satishkumar Sheth suraj...@adobe.com wrote: Hi All, I have a folder in HDFS which has files with size of 47GB. I am loading this in Spark as RDD[String] and caching it. The total amount of RAM that Spark uses to cache it is around 97GB. I want to know why Spark is taking up so much of Space for the RDD? Can we reduce the RDD size in Spark and make it similar to it’s size on disk? Thanks and Regards, Suraj Sheth
Re: Implementing a custom Spark shell
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details). Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell. Matei On Feb 25, 2014, at 11:44 PM, Sampo Niskanen sampo.niska...@wellmo.com wrote: Hi, I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application. Is this possible? I took a look at the code that the spark-shell invokes, and it seems quite complex. Can this be reused from my code? I'm implementing a standalone application that uses the Spark libraries (managed by SBT). Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution. Alternatively, can some utility code be injected within the standard spark-shell? Thanks. Sampo Niskanen Lead developer / Wellmo
Re: Building spark with native library support
Is it an error, or just a warning? In any case, you need to get those libraries from a build of Hadoop for your platform. Then add them to the SPARK_LIBRARY_PATH environment variable in conf/spark-env.sh, or to your -Djava.library.path if launching an application separately. These libraries just speed up some compression codecs BTW, so it should be fine to run without them too. Matei On Mar 6, 2014, at 9:04 AM, Alan Burlison alan.burli...@oracle.com wrote: Hi, I've successfully built 0.9.0-incubating on Solaris using sbt, following the instructions at http://spark.incubator.apache.org/docs/latest/ and it seems to work OK. However, when I start it up I get an error about missing Hadoop native libraries. I can't find any mention of how to build the native components in the instructions, how is that done? Thanks, -- Alan Burlison --
Re: major Spark performance problem
Hi Dana, It’s hard to tell exactly what is consuming time, but I’d suggest starting by profiling the single application first. Three things to look at there: 1) How many stages and how many tasks per stage is Spark launching (in the application web UI at http://driver:4040)? If you have hundreds of tasks for this small a file, just the task launching time might be a problem. You can use RDD.coalesce() to have fewer data partitions. 2) If you run a Java profiler (e.g. YourKit or hprof) on the workers while the application is executing, where is time being spent? Maybe some of your code is more expensive than it seems. One other thing you might find is that some code you use requires synchronization and is therefore not scaling properly to multiple cores (e.g. Java’s Math.random() actually does that). 3) Are there any RDDs that are used over and over but not cached? In that case they’ll be recomputed on each use. Once you look into these it might be easier to improve the multiple-job case. In that case as others have pointed out, running the jobs in the same SparkContext and using the fair scheduler (http://spark.apache.org/docs/latest/job-scheduling.html) should work. Matei On Mar 9, 2014, at 5:56 AM, Livni, Dana dana.li...@intel.com wrote: YARN also have this scheduling option. The problem is all of our applications have the same flow where the first stage is the heaviest and the rest are very small. The problem is when some request (application) start to run on the same time, the first stage of all is schedule in parallel, and for some reason they delay each other, And a stage that alone will take around 13s can reach up to 2m when running in parallel with other identic stages (around 15 stages). -Original Message- From: elyast [mailto:lukasz.jastrzeb...@gmail.com] Sent: Friday, March 07, 2014 20:01 To: u...@spark.incubator.apache.org Subject: Re: major Spark performance problem Hi, There is also an option to run spark applications on top of mesos in fine grained mode, then it is possible for fair scheduling (applications will run in parallel and mesos is responsible for scheduling all tasks) so in a sense all applications will progress in parallel, obviously it total in may not be faster however the benefit is the fair scheduling (small jobs will not be stuck by the big ones). Best regards Lukasz Jastrzebski -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/major-Spark-performance-problem-tp2364p2403.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - Intel Electronics Ltd. This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
Re: NO SUCH METHOD EXCEPTION
Since it’s from Scala, it might mean you’re running with a different version of Scala than you compiled Spark with. Spark 0.8 and earlier use Scala 2.9, while Spark 0.9 uses Scala 2.10. Matei On Mar 11, 2014, at 8:19 AM, Jeyaraj, Arockia R (Arockia) arockia.r.jeya...@verizon.com wrote: Hi, Can anyone help me to resolve this issue? Why am I getting NoSuchMethod exception? 14/03/11 09:56:11 ERROR executor.Executor: Exception in task ID 0 java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lsca la/collection/immutable/StringOps; at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s cala:75) at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58 ) at kafka.utils.ZKConfig.init(ZkUtils.scala:837) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr eam.scala:98) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput DStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc V$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca la:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Loss was due to java.lang.NoSuc hMethodError java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lsca la/collection/immutable/StringOps; at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s cala:75) at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58 ) at kafka.utils.ZKConfig.init(ZkUtils.scala:837) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr eam.scala:98) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput DStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec utor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc ala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc V$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca la:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) 14/03/11 09:56:11 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; abo rting job 14/03/11 09:56:11 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool 14/03/11 09:56:11 INFO scheduler.DAGScheduler: Failed to run runJob at NetworkIn putTracker.scala:182 [error] (Thread-34) org.apache.spark.SparkException: Job aborted: Task 0.0:0 fai Thanks Arockia Raja
Re: Powered By Spark Page -- Companies Organizations
Thanks, added you. On Mar 11, 2014, at 2:47 AM, Christoph Böhm listenbru...@gmx.net wrote: Dear Spark team, thanks for the great work and congrats on becoming an Apache top-level project! You could add us to your Powered-by-page, because we are using Spark (and Shark) to perform interactive exploration of large datasets. Find us on: www.bakdata.com Best, Christoph - Christoph Böhm bakdata | bespoke data engineering www.bakdata.com
Re: RDD.saveAs...
I agree that we can’t keep adding these to the core API, partly because it will get unwieldy to maintain and partly just because each storage system will bring in lots of dependencies. We can simply have helper classes in different modules for each storage system. There’s some discussion on this at https://spark-project.atlassian.net/browse/SPARK-1127. Matei On Mar 11, 2014, at 9:06 AM, Koert Kuipers ko...@tresata.com wrote: I find the current design to write RDDs to disk (or a database, etc) kind of ugly. It will lead to a proliferation of saveAs methods. A better abstraction would be nice (perhaps a Sink trait to write to)
Re: possible bug in Spark's ALS implementation...
On Mar 14, 2014, at 5:52 PM, Michael Allman m...@allman.ms wrote: I also found that the product and user RDDs were being rebuilt many times over in my tests, even for tiny data sets. By persisting the RDD returned from updateFeatures() I was able to avoid a raft of duplicate computations. Is there a reason not to do this? This sounds like a good thing to add, though I’d like to understand why these are being recomputed (it seemed that the code would only use each one once). Do you have any sense why that is? Matei
Re: How to kill a spark app ?
If it’s a driver on the cluster, please open a JIRA issue about this — this kill command is indeed intended to work. Matei On Mar 16, 2014, at 2:35 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Are you embedding your driver inside the cluster? If not then that command will not kill the driver. You can simply kill the application by killing the scala application. So if its spark shell, simply by killing the shell the application will disconnect from the cluster. If the driver is embedded in the cluster then the above command will be required. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sun, Mar 16, 2014 at 5:06 PM, Debasish Das debasish.da...@gmail.com wrote: Thanks Mayur... I need both...but to start with even application killer will help a lot... Somehow that command did not work for meI will try it again from the spark main folder.. On Sun, Mar 16, 2014 at 1:43 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: This is meant to kill the whole driver hosted inside the Master (new feature as of 0.9.0). I assume you are trying to kill a job/task/stage inside the Spark rather than the whole application. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sun, Mar 16, 2014 at 4:36 PM, Debasish Das debasish.da...@gmail.com wrote: From http://spark.incubator.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster ./bin/spark-class org.apache.spark.deploy.Client kill driverId does not work / has bugs ? On Sun, Mar 16, 2014 at 1:17 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Thr is a no good way to kill jobs in Spark yet. The closest is cancelAllJobs cancelJobGroup in spark context. I have had bugs using both. I am trying to test them out, typically you would start a different thread call these functions on it when you wish to cancel a job. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sun, Mar 16, 2014 at 2:59 PM, Debasish Das debasish.da...@gmail.com wrote: Are these the right options: 1. If there is a spark script, just do a ctrl-c from spark-shell and the job will be killed property. 2. For spark application also ctrl c will kill the job property on the cluster: Somehow the ctrl-c option did not work for us... Similar option works fine for scalding for example but we see lot of dead nodes if too many jobs are killed abruptly. 3. Use the Client script... /bin/spark-class org.apache.spark.deploy.Client kill spark://myspark.com:7077 app-20140316142129- Runner java Classpath :/home/debasish/sag_spark/conf:/home/debasish/sag_spark/assembly/target/scala-2.10/spark-assembly-1.0.0-incubating-SNAPSHOT-hadoop2.0.0-mr1-cdh4.5.0.jar Java opts -Djava.library.path= -Xms512m -Xmx512m Options -Dspark.cores.max=16 Sending kill command to spark://myspark.com:7077 Driver app-20140316142129- has already finished or does not exist This option also did not kill the job. I can still see the job running on spark webui... Thanks. Deb
Re: [Powered by] Yandex Islands powered by Spark
Thanks, I’ve added you: https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark. Let me know if you want to change any wording. Matei On Mar 16, 2014, at 6:48 AM, Egor Pahomov pahomov.e...@gmail.com wrote: Hi, page https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark says I need write here, if want my project to be added there. In Yandex (www.yandex.com) now we using spark for project Yandex Islands (http://www.searchenginejournal.com/yandex-islands-markup-issues-implementation/71891/). We process islands which come from search robot with spark. -- Sincerely yours Egor Pakhomov Scala Developer, Yandex
Re: example of non-line oriented input data?
Hi Diana, Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon. If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object. Matei On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Krakna, very helpful. The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object? (That is, that the data doesn't contain any records that are split into multiple lines.) If so, is that because you know that to be true of your data? Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way? Thanks, Diana On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote: Katrina, Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files. from pyspark import SparkContext, SparkConf from operator import add import json import random import numpy as np def concatenate_paragraphs(sentence_array): return ' '.join(sentence_array).split(' ') logFile = 'foo.json' conf = SparkConf() conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory, 1g) sc = SparkContext(conf=conf) logData = sc.textFile(logFile).cache() num_lines = logData.count() print 'Number of lines: %d' % num_lines # JSON object has the structure: {key: {'paragraphs': [sentence1, sentence2, ...]}} tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs'] tm = tm.reduceByKey(lambda _, x: _ + x) op = tm.collect() for key, num_words in op: print 'state: %s, num_words: %d' % (state, num_words) On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] [hidden email] wrote: I don't actually have any data. I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that. I'd love to see some working code implementing the obvious work-around you mention...do you have any to share? It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code. Thanks! Diana On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas [hidden email] wrote: There was a previous discussion about this here: http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html How big are the XML or JSON files you're looking to deal with? It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a line-oriented format. Your preprocessor wouldn't have to parse/deserialize the massive document; it would just have to track open/closed tags/braces to know when to insert a newline. Then you'd just open the line-delimited result and deserialize the individual objects/nodes with map(). Nick On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll [hidden email] wrote: Has anyone got a working example of a Spark application that analyzes data in a non-line-oriented format, such as XML or JSON? I'd like to do this without re-inventing the wheel...anyone care to share? Thanks! Diana If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html To start a new topic under Apache Spark User List, email [hidden email] To unsubscribe from Apache Spark User List, click here. NAML View this message in context: Re: example of non-line oriented input data? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: example of non-line oriented input data?
Here’s an example of getting together all lines in a file as one string: $ cat dir/a.txt Hello world! $ cat dir/b.txt What's up?? $ bin/pyspark files = sc.textFile(“dir”) files.collect() [u'Hello', u'world!', uWhat's, u'up??’] # one element per line, not what we want files.glom().collect() [[u'Hello', u'world!'], [uWhat's, u'up??’]] # one element per file, which is an array of lines files.glom().map(lambda a: \n.join(a)).collect() [u'Hello\nworld!', uWhat's\nup??”]# join back each file into a single string The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file. There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. Matei On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks Matei. That makes sense. I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense. I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be. Thanks, Diana On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon. If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object. Matei On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Krakna, very helpful. The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object? (That is, that the data doesn't contain any records that are split into multiple lines.) If so, is that because you know that to be true of your data? Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way? Thanks, Diana On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote: Katrina, Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files. from pyspark import SparkContext, SparkConf from operator import add import json import random import numpy as np def concatenate_paragraphs(sentence_array): return ' '.join(sentence_array).split(' ') logFile = 'foo.json' conf = SparkConf() conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory, 1g) sc = SparkContext(conf=conf) logData = sc.textFile(logFile).cache() num_lines = logData.count() print 'Number of lines: %d' % num_lines # JSON object has the structure: {key: {'paragraphs': [sentence1, sentence2, ...]}} tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs'] tm = tm.reduceByKey(lambda _, x: _ + x) op = tm.collect() for key, num_words in op: print 'state: %s, num_words: %d' % (state, num_words) On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] [hidden email] wrote: I don't actually have any data. I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that. I'd love to see some working code implementing the obvious work-around you mention...do you have any to share? It's an approach that makes a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else has already written that code. Thanks! Diana On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas [hidden email] wrote: There was a previous discussion about this here: http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html How big are the XML or JSON files you're looking to deal with? It may not be practical to deserialize the entire document at once. In that case an obvious work-around would be to have some kind of pre-processing step that separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark in a line-oriented
Re: is collect exactly-once?
Yup, it only returns each value once. Matei On Mar 17, 2014, at 1:14 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi Quick question here, I know that .foreach is not idempotent. I am wondering if collect() is idempotent? Meaning that once I’ve collect()-ed if spark node crashes I can’t get the same values from the stream ever again. Thanks -Adrian
Re: example of non-line oriented input data?
Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum). For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work: data.mapPartitions(lambda x: x).collect() [1, 2, 3, 4] # Just return the same iterator, doing nothing data.mapPartitions(lambda x: [list(x)]).collect() [[1, 2], [3, 4]] # Group together the elements of each partition in a single list (like glom) data.mapPartitions(lambda x: [sum(x)]).collect() [3, 7] # Sum each partition separately However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above. In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects. Matei On Mar 17, 2014, at 11:25 AM, Diana Carroll dcarr...@cloudera.com wrote: There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions. No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators. All I get so far in my attempts to use mapPartitions is the darned suchnsuch is not an iterator error. def myfunction(iterator): return [1,2,3] mydata.mapPartitions(lambda x: myfunction(x)).take(2) On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Here’s an example of getting together all lines in a file as one string: $ cat dir/a.txt Hello world! $ cat dir/b.txt What's up?? $ bin/pyspark files = sc.textFile(“dir”) files.collect() [u'Hello', u'world!', uWhat's, u'up??’] # one element per line, not what we want files.glom().collect() [[u'Hello', u'world!'], [uWhat's, u'up??’]] # one element per file, which is an array of lines files.glom().map(lambda a: \n.join(a)).collect() [u'Hello\nworld!', uWhat's\nup??”]# join back each file into a single string The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file. There’s also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. Matei On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks Matei. That makes sense. I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense. I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be. Thanks, Diana On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you’d have to fit the whole JSON object on one line as you said. Hopefully we’ll also have some other forms of input soon. If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object. Matei On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Krakna, very helpful. The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object? (That is, that the data doesn't contain any records that are split into multiple lines.) If so, is that because you know that to be true of your data? Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way? Thanks, Diana On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote: Katrina, Not sure if this is what you had
Re: links for the old versions are broken
Thanks for reporting this, looking into it. On Mar 17, 2014, at 2:44 PM, Walrus theCat walrusthe...@gmail.com wrote: ping On Thu, Mar 13, 2014 at 11:05 AM, Aaron Davidson ilike...@gmail.com wrote: Looks like everything from 0.8.0 and before errors similarly (though Spark 0.3 for Scala 2.9 has a malformed link as well). On Thu, Mar 13, 2014 at 10:52 AM, Walrus theCat walrusthe...@gmail.com wrote: Sup, Where can I get Spark 0.7.3? It's 404 here: http://spark.apache.org/downloads.html Thanks
Re: Incrementally add/remove vertices in GraphX
I just meant that you call union() before creating the RDDs that you pass to new Graph(). If you call it after it will produce other RDDs. The Graph() constructor actually shuffles and “indexes” the data to make graph operations efficient, so it’s not too easy to add elements after. You could access graph.vertices and graph.edges to build new RDDs, and then call Graph() again to make a new graph. I’ve CCed Joey and Ankur to see if they have further ideas on how to optimize this. It would be cool to support more efficient union and subtracting of graphs once they’ve been partitioned by GraphX. Matei On Mar 14, 2014, at 8:32 AM, alelulli alessandro.lu...@gmail.com wrote: Hi Matei, Could you please clarify why i must call union before creating the graph? What's the behavior if i call union / subtract after the creation? Is the added /removed vertexes been processed? For example if i'm implementing an iterative algorithm and at the 5th step i need to add some vertex / edge, can i call union / subtract on the VertexRDD, EdgeRDD and Triplets? Thanks Alessandro -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p2695.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Pyspark worker memory
Try checking spark-env.sh on the workers as well. Maybe code there is somehow overriding the spark.executor.memory setting. Matei On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote: Hello, I'm using the Github snapshot of PySpark and having trouble setting the worker memory correctly. I've set spark.executor.memory to 5g, but somewhere along the way Xmx is getting capped to 512M. This was not occurring with the same setup and 0.9.0. How many places do I need to configure the memory? Thank you!
Re: What's the lifecycle of an rdd? Can I control it?
Yes, Spark automatically removes old RDDs from the cache when you make new ones. Unpersist forces it to remove them right away. In both cases though, note that Java doesn’t garbage-collect the objects released until later. Matei On Mar 19, 2014, at 7:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Related question: If I keep creating new RDDs and cache()-ing them, does Spark automatically unpersist the least recently used RDD when it runs out of memory? Or is an explicit unpersist the only way to get rid of an RDD (barring the PR Tathagata mentioned)? Also, does unpersist()-ing an RDD immediately free up space, or just allow that space to be reclaimed when needed? On Wed, Mar 19, 2014 at 7:01 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Just a head's up, there is an active pull requeust that will automatically unpersist RDDs that are not in reference/scope from the application any more. TD On Wed, Mar 19, 2014 at 6:58 PM, hequn cheng chenghe...@gmail.com wrote: persist and unpersist. unpersist:Mark the RDD as non-persistent, and remove all blocks for it from memory and disk 2014-03-19 16:40 GMT+08:00 林武康 vboylin1...@gmail.com: Hi, can any one tell me about the lifecycle of an rdd? I search through the official website and still can't figure it out. Can I use an rdd in some stages and destroy it in order to release memory because that no stages ahead will use this rdd any more. Is it possible? Thanks! Sincerely Lin wukang
Re: Pyspark worker memory
Yeah, this is definitely confusing. The motivation for this was that different users of the same cluster may want to set different memory sizes for their apps, so we decided to put this setting in the driver. However, if you put SPARK_JAVA_OPTS in spark-env.sh, it also applies to executors, which is confusing (though in this case it wouldn’t overwrite spark.executor.memory AFAIK). We want to clean a bunch of this stuff up for 1.0, or at least document it better. Thanks for the suggestions. Matei On Mar 19, 2014, at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote: To document this, it would be nice to clarify what environment variables should be used to set which Java system properties, and what type of process they affect. I'd be happy to start a page if you can point me to the right place: SPARK_JAVA_OPTS: -Dspark.executor.memory can by set on the machine running the driver (typically the master host) and will affect the memory available to the Executor running on a slave node -D SPARK_DAEMON_OPTS: On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote: Thanks for the suggestion, Matei. I've tracked this down to a setting I had to make on the Driver. It looks like spark-env.sh has no impact on the Executor, which confused me for a long while with settings like SPARK_EXECUTOR_MEMORY. The only setting that mattered was setting the system property in the *driver* (in this case pyspark/shell.py) or using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*. I'm not sure how this varies from 0.9.0 release, but it seems to work on SNAPSHOT. On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Try checking spark-env.sh on the workers as well. Maybe code there is somehow overriding the spark.executor.memory setting. Matei On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote: Hello, I'm using the Github snapshot of PySpark and having trouble setting the worker memory correctly. I've set spark.executor.memory to 5g, but somewhere along the way Xmx is getting capped to 512M. This was not occurring with the same setup and 0.9.0. How many places do I need to configure the memory? Thank you!
Re: DStream spark paper
Hi Adrian, On every timestep of execution, we receive new data, then report updated word counts for that new data plus the past 30 seconds. The latency here is about how quickly you get these updated counts once the new batch of data comes in. It’s true that the count reflects some data from 30 seconds ago as well, but it doesn’t mean the overall processing latency is 30 seconds. Matei On Mar 20, 2014, at 1:36 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I looked over the specs on page 9 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf The first paragraph mentions the window size is 30 seconds “Word-Count, which performs a sliding window count over 30s; and TopKCount, which finds the k most frequent words over the past 30s. “ The second paragraph mentions subsecond latency. Putting these 2 together, is the paper saying that in the 30 sec window the tuples are delayed at most 1 second? The paper explains “By “end-to-end latency,” we mean the time from when records are sent to the system to when results incorporating them appear.” This leads me to conclude that end-to-end latency for a 30 sec window should be at least 30 seconds because results won’t be incorporated until the entire window is completed ie: 30sec. At the same time the paper claims latency is sub second so clearly I’m misunderstanding something. -Adrian
Re: How to save as a single file efficiently?
Try passing the shuffle=true parameter to coalesce, then it will do the map in parallel but still pass all the data through one reduce node for writing it out. That’s probably the fastest it will get. No need to cache if you do that. Matei On Mar 21, 2014, at 4:04 PM, Aureliano Buendia buendia...@gmail.com wrote: Hi, Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We found that a partition number of 1000 is a good number to speed the process up. However, it does not make sense to have 1000 pieces of csv files each less than 1 kb. We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and we are not properly using our resources this way. So this is very slow: rdd.map(...).coalesce(1).saveAsTextFile() How is it possible to use coalesce(1) simply for concatenating the materialized output text files? Would something like this make sense?: rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile() Or, would something like this achieve it?: rdd.map(...).cache().coalesce(1).saveAsTextFile()
Re: How to save as a single file efficiently?
Ah, the reason is because coalesce is often used to deal with lots of small input files on HDFS. In that case you don’t want to reshuffle them all across the network, you just want each mapper to directly read multiple files (and you want fewer than one mapper per file). Matei On Mar 21, 2014, at 5:01 PM, Aureliano Buendia buendia...@gmail.com wrote: Good to know it's as simple as that! I wonder why shuffle=true is not the default for coalesce(). On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Try passing the shuffle=true parameter to coalesce, then it will do the map in parallel but still pass all the data through one reduce node for writing it out. That’s probably the fastest it will get. No need to cache if you do that. Matei On Mar 21, 2014, at 4:04 PM, Aureliano Buendia buendia...@gmail.com wrote: Hi, Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We found that a partition number of 1000 is a good number to speed the process up. However, it does not make sense to have 1000 pieces of csv files each less than 1 kb. We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and we are not properly using our resources this way. So this is very slow: rdd.map(...).coalesce(1).saveAsTextFile() How is it possible to use coalesce(1) simply for concatenating the materialized output text files? Would something like this make sense?: rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile() Or, would something like this achieve it?: rdd.map(...).cache().coalesce(1).saveAsTextFile()
Re: error loading large files in PySpark 0.9.0
Hey Jeremy, what happens if you pass batchSize=10 as an argument to your SparkContext? It tries to serialize that many objects together at a time, which might be too much. By default the batchSize is 1024. Matei On Mar 23, 2014, at 10:11 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi all, Hitting a mysterious error loading large text files, specific to PySpark 0.9.0. In PySpark 0.8.1, this works: data = sc.textFile(path/to/myfile) data.count() But in 0.9.0, it stalls. There are indications of completion up to: 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X (progress: 15/537) 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4) And then this repeats indefinitely 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 Always stalls at the same place. There's nothing in stderr on the workers, but in stdout there are several of these messages: INFO PythonRDD: stdin writer to Python finished early So perhaps the real error is being suppressed as in https://spark-project.atlassian.net/browse/SPARK-1025 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k characters per row. Running on a private cluster with 10 nodes, 100GB / 16 cores each, Python v 2.7.6. I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0, and in PySpark in 0.8.1. Happy to post the file, but it should repro for anything with these dimensions. It *might* be specific to long strings: I don't see it with fewer characters (10k) per row, but I also don't see it with many fewer rows but the same number of characters per row. Happy to try and provide more info / help debug! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Announcing Spark SQL
Congrats Michael co for putting this together — this is probably the neatest piece of technology added to Spark in the past few months, and it will greatly change what users can do as more data sources are added. Matei On Mar 26, 2014, at 3:22 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Wow! Ognen On 3/26/14, 4:58 PM, Michael Armbrust wrote: Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
Re: All pairs shortest paths?
Yeah, if you’re just worried about statistics, maybe you can do sampling (do single-pair paths from 100 random nodes and you get an idea of what percentage of nodes have what distribution of neighbors in a given distance). Matei On Mar 26, 2014, at 5:55 PM, Ryan Compton compton.r...@gmail.com wrote: Much thanks, I suspected this would be difficult. I was hoping to generate some 4 degrees of separation-like statistics. Looks like I'll just have to work with a subset of my graph. On Wed, Mar 26, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com wrote: All-pairs distances is tricky for a large graph because you need O(V^2) storage. Do you want to just quickly query the distance between two vertices? In that case you can do single-source shortest paths, which I believe exists in GraphX, or at least is very quick to implement on top of its Pregel API. If your graph is small enough that storing all-pairs is feasible, you can probably run this as an iterative algorithm: http://en.wikipedia.org/wiki/Floyd–Warshall_algorithm, though I haven’t tried it. It may be tough to do with GraphX. Matei On Mar 26, 2014, at 3:51 PM, Ryan Compton compton.r...@gmail.com wrote: To clarify: I don't need the actual paths, just the distances. On Wed, Mar 26, 2014 at 3:04 PM, Ryan Compton compton.r...@gmail.com wrote: No idea how feasible this is. Has anyone done it?
Re: pySpark memory usage
I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We’ll try to look into these, seems like a serious error. Matei On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Matei. I am running Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 from GitHub on 2014-03-18. I tried batchSizes of 512, 10, and 1 and each got me further but none have succeeded. I can get this to work -- with manual interventions -- if I omit `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1. 5 of the 175 executors hung, and I had to kill the python process to get things going again. The only indication of this in the logs was `INFO python.PythonRDD: stdin writer to Python finished early`. With batchSize=1 and persist, a new memory error came up in several tasks, before the app was failed: 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in thread Thread[stdin writer for python,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) There are other exceptions, but I think they all stem from the above, eg. org.apache.spark.SparkException: Error sending message to BlockManagerMaster Let me know if there are other settings I should try, or if I should try a newer snapshot. Thanks again! On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jim, In Spark 0.9 we added a batchSize parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions. Matei On Mar 21, 2014, at 6:18 PM, Jim Blomo jim.bl...@gmail.com wrote: Hi all, I'm wondering if there's any settings I can use to reduce the memory needed by the PythonRDD when computing simple stats. I am getting OutOfMemoryError exceptions while calculating count() on big, but not absurd, records. It seems like PythonRDD is trying to keep too many of these records in memory, when all that is needed is to stream through them and count. Any tips for getting through this workload? Code: session = sc.textFile('s3://...json.gz') # ~54GB of compressed data # the biggest individual text line is ~3MB parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s): (loads(y), loads(s))) parsed.persist(StorageLevel.MEMORY_AND_DISK) parsed.count() # will never finish: executor.Executor: Uncaught exception will FAIL all executors Incidentally the whole app appears to be killed, but this error is not propagated to the shell. Cluster: 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB) Exception: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
Re: Strange behavior of RDD.cartesian
Weird, how exactly are you pulling out the sample? Do you have a small program that reproduces this? Matei On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I forgot to mention that I don't really use all of my data. Instead I use a sample extracted with randomSample. On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I notice that RDD.cartesian has a strange behavior with cached and uncached data. More precisely, I have a set of data that I load with objectFile val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data) Then I split it in two set depending on some criteria val part1 = data.filter(_._2 matches view1) val part2 = data.filter(_._2 matches view2) Finally, I compute the cartesian product of part1 and part2 val pair = part1.cartesian(part2) If every thing goes well I should have pair.count == part1.count * part2.count But this is not the case if I don't cache part1 and part2. What I was missing ? Does caching data mandatory in Spark ? Cheers, Jaonary
Re: [shark-users] SQL on Spark - Shark or SparkSQL
Hi Manoj, At the current time, for drop-in replacement of Hive, it will be best to stick with Shark. Over time, Shark will use the Spark SQL backend, but should remain deployable the way it is today (including launching the SharkServer, using the Hive CLI, etc). Spark SQL is better for accessing Hive data within a Spark program though, where its APIs are richer and easier to link to than the SharkContext.sql2rdd we had previously provided in Shark. So in a nutshell, if you have a Shark deployment today, or need the HiveServer, then going with Shark will be fine and we will switch out the backend in a future release (we’ll probably create preview of this even before we’re ready to fully switch). If you just want to run SQL queries or load SQL data within a Spark program, try out Spark SQL. Matei On Mar 30, 2014, at 4:46 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: +1 Have done a few installations of Shark with customers using Hive, they love it. Would be good to maintain compatibility with Metastore QL till we have substantial reason to break off (like BlinkDB). Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sun, Mar 30, 2014 at 2:46 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This is a great question. We are in the same position, having not invested in Hive yet and looking at various options for SQL-on-Hadoop. On Sat, Mar 29, 2014 at 9:48 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, In context of the recent Spark SQL announcement (http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html). If there is no existing investment in Hive/Shark, would it be worth starting a new SQL work using SparkSQL rather than Shark ? * It seems Shark SQL core will use more and more of SparkSQL * From the blog, it seems Shark has baggage from Hive, that is not needed in this case On the other hand, there seems to be two shortcomings of SparkSQL (from a quick scan of blog and doc) * SparkSQL will have less features than Shark/Hive QL, at least for now. * The standalone SharkServer feature will not be available in SparkSQL. Can someone from Databricks shed light on what is the long term roadmap? It will help in avoiding investing in older/two technologies for work with no Hive needs. Thanks, PS: Great work on SparkSQL -- You received this message because you are subscribed to the Google Groups shark-users group. To unsubscribe from this group and stop receiving emails from it, send an email to shark-users+unsubscr...@googlegroups.com. To post to this group, send email to shark-us...@googlegroups.com. Visit this group at http://groups.google.com/group/shark-users. For more options, visit https://groups.google.com/d/optout.
Re: Mllib in pyspark for 0.8.1
You could probably port it back, but it required some changes on the Java side as well (a new PythonMLUtils class). It might be easier to fix the Mesos issues with 0.9. Matei On Apr 1, 2014, at 8:53 AM, Ian Ferreira ianferre...@hotmail.com wrote: Hi there, For some reason the distribution and build for 0.8.1 does not include the MLLib libraries for pyspark i.e. import from mllib fails. Seems to be addressed in 0.9.0, but that has other issue running on mesos in standalone mode :) Any pointers? Cheers - Ian
Re: Spark 1.0.0 release plan
Hey Bhaskar, this is still the plan, though QAing might take longer than 15 days. Right now since we’ve passed April 1st, the only features considered for a merge are those that had pull requests in review before. (Some big ones are things like annotating the public APIs and simplifying configuration). Bug fixes and things like adding Python / Java APIs for new components will also still be considered. Matei On Apr 3, 2014, at 10:30 AM, Bhaskar Dutta bhas...@gmail.com wrote: Hi, Is there any change in the release plan for Spark 1.0.0-rc1 release date from what is listed in the Proposal for Spark Release Strategy thread? == Tentative Release Window for 1.0.0 == Feb 1st - April 1st: General development April 1st: Code freeze for new features April 15th: RC1 Thanks, Bhaskar
Re: Optimal Server Design for Spark
To run multiple workers with Spark’s standalone mode, set SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES in conf/spark-env.sh. For example, if you have 16 cores and want 2 workers, you could add export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_CORES=8 Matei On Apr 3, 2014, at 12:38 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Are your workers not utilizing all the cores? One worker will utilize multiple cores depending on resource allocation. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Wed, Apr 2, 2014 at 7:19 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Matei, How can I run multiple Spark workers per node ? I am running 8 core 10 node cluster but I do have 8 more cores on each nodeSo having 2 workers per node will definitely help my usecase. Thanks. Deb On Wed, Apr 2, 2014 at 3:58 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Steve, This configuration sounds pretty good. The one thing I would consider is having more disks, for two reasons — Spark uses the disks for large shuffles and out-of-core operations, and often it’s better to run HDFS or your storage system on the same nodes. But whether this is valuable will depend on whether you plan to do that in your deployment. You should determine that and go from there. The amount of cores and RAM are both good — actually with a lot more of these you would probably want to run multiple Spark workers per node, which is more work to configure. Your numbers are in line with other deployments. There’s a provisioning overview with more details at https://spark.apache.org/docs/latest/hardware-provisioning.html but what you have sounds fine. Matei On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote: Hi Folks I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop Server design but there does not seem to be much Spark related collateral around infrastructure guidelines (or at least I haven't been able to find them). My current thinking for server design is something along these lines. - 2 x 10Gbe NICs - 128 GB RAM - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 4 x 1TB for Data Drives) - 1 Disk Controller - 2 x 2.6 GHz 6 core processors If I stick with 1u servers then I lose disk capacity per rack but I get a lot more memory and CPU capacity per rack. This increases my total cluster memory footprint and it doesn't seem to make sense to have super dense storage servers because I can't fit all that data on disk in memory anyways. So at present, my thinking is to go with 1u servers instead of 2u Servers. Is 128GB RAM per server normal? Do you guys use more or less than that? Any feedback would be appreciated Regards Steve Watt
Re: How are exceptions in map functions handled in Spark?
Exceptions should be sent back to the driver program and logged there (with a SparkException thrown if a task fails more than 4 times), but there were some bugs before where this did not happen for non-Serializable exceptions. We changed it to pass back the stack traces only (as text), which should always work. I’d recommend trying a newer Spark version, 0.8 should be easy to upgrade to from 0.7. Matei On Apr 4, 2014, at 10:40 AM, John Salvatier jsalvat...@gmail.com wrote: I'm trying to get a clear idea about how exceptions are handled in Spark? Is there somewhere where I can read about this? I'm on spark .7 For some reason I was under the impression that such exceptions are swallowed and the value that produced them ignored but the exception is logged. However, right now we're seeing the task just re-tried over and over again in an infinite loop because there's a value that always generates an exception. John
Re: example of non-line oriented input data?
FYI, one thing we’ve added now is support for reading multiple text files from a directory as separate records: https://github.com/apache/spark/pull/327. This should remove the need for mapPartitions discussed here. Avro and SequenceFiles look like they may not make it for 1.0, but there’s a chance that Parquet support with Spark SQL will, which should let you store binary data a bit better. Matei On Mar 19, 2014, at 3:12 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Another vote on this, support for simple SequenceFiles and/or Avro would be terrific, as using plain text can be very space-inefficient, especially for numerical data. -- Jeremy On Mar 19, 2014, at 5:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'd second the request for Avro support in Python first, followed by Parquet. On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin itparan...@gmail.com wrote: On 19 Mar 2014, at 19:54, Diana Carroll dcarr...@cloudera.com wrote: Actually, thinking more on this question, Matei: I'd definitely say support for Avro. There's a lot of interest in this!! Agree, and parquet as default Cloudera Impala format. On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.com wrote: BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good. Matei On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator. Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6. In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example. Matei On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote: Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?) import xml.etree.ElementTree as ET # two source files, format data country name=../country.../data mydata=sc.textFile(file:/home/training/countries*.xml) def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator(country) # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string s. How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Matei. In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line. The theory makes sense but I'm still utterly lost as to how to implement it. Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?) I'd really really love to see a real life example of a Python use of mapPartitions. I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out
Re: Having spark-ec2 join new slaves to existing cluster
This can’t be done through the script right now, but you can do it manually as long as the cluster is stopped. If the cluster is stopped, just go into the AWS Console, right click a slave and choose “launch more of these” to add more. Or select multiple slaves and delete them. When you run spark-ec2 start the next time to start your cluster, it will set it up on all the machines it finds in the mycluster-slaves security group. This is pretty hacky so it would definitely be good to add this feature; feel free to open a JIRA about it. Matei On Apr 4, 2014, at 12:16 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I would like to be able to use spark-ec2 to launch new slaves and add them to an existing, running cluster. Similarly, I would also like to remove slaves from an existing cluster. Use cases include: Oh snap, I sized my cluster incorrectly. Let me add/remove some slaves. During scheduled batch processing, I want to add some new slaves, perhaps on spot instances. When that processing is done, I want to kill them. (Cruel, I know.) I gather this is not possible at the moment. spark-ec2 appears to be able to launch new slaves for an existing cluster only if the master is stopped. I also do not see any ability to remove slaves from a cluster. Is that correct? Are there plans to add such functionality to spark-ec2 in the future? Nick View this message in context: Having spark-ec2 join new slaves to existing cluster Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark on other parallel filesystems
As long as the filesystem is mounted at the same path on every node, you should be able to just run Spark and use a file:// URL for your files. The only downside with running it this way is that Lustre won’t expose data locality info to Spark, the way HDFS does. That may not matter if it’s a network-mounted file system though. Matei On Apr 4, 2014, at 4:56 PM, Venkat Krishnamurthy ven...@yarcdata.com wrote: All Are there any drawbacks or technical challenges (or any information, really) related to using Spark directly on a global parallel filesystem like Lustre/GPFS? Any idea of what would be involved in doing a minimal proof of concept? Is it just possible to run Spark unmodified (without the HDFS substrate) for a start, or will that not work at all? I do know that it’s possible to implement Tachyon on Lustre and get the HDFS interface – just looking at other options. Venkat
Re: Spark 0.9.1 released
Thanks TD for managing this release, and thanks to everyone who contributed! Matei On Apr 9, 2014, at 2:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: A small additional note: Please use the direct download links in the Spark Downloads page. The Apache mirrors take a day or so to sync from the main repo, so may not work immediately. TD On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hi everyone, We have just posted Spark 0.9.1, which is a maintenance release with bug fixes, performance improvements, better stability with YARN and improved parity of the Scala and Python API. We recommend all 0.9.0 users to upgrade to this stable release. This is the first release since Spark graduated as a top level Apache project. Contributions to this release came from 37 developers. The full release notes are at: http://spark.apache.org/releases/spark-release-0-9-1.html You can download the release at: http://spark.apache.org/downloads.html Thanks all the developers who contributed to this release: Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch, Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao, Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai, Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout, Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham, Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang, Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu, shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng TD
Re: pySpark memory usage
Okay, thanks. Do you have any info on how large your records and data file are? I’d like to reproduce and fix this. Matei On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote: Hi Matei, thanks for working with me to find these issues. To summarize, the issues I've seen are: 0.9.0: - https://issues.apache.org/jira/browse/SPARK-1323 SNAPSHOT 2014-03-18: - When persist() used and batchSize=1, java.lang.OutOfMemoryError: Java heap space. To me this indicates a memory leak since Spark should simply be counting records of size 3MB - Without persist(), stdin writer to Python finished early hangs the application, unknown root cause I've recently rebuilt another SNAPSHOT, git commit 16b8308 with debugging turned on. This gives me the stacktrace on the new stdin problem: 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) at sun.security.ssl.InputRecord.read(InputRecord.java:509) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76) at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136) at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Cool, thanks for the update. Have you tried running a branch with this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue are you referring to, is it separate from this? (Couldn't find it earlier in the thread.) To turn on debug logging, copy conf/log4j.properties.template to conf/log4j.properties and change the line log4j.rootCategory=INFO, console to log4j.rootCategory=DEBUG, console. Then make sure this file is present in conf on all workers. BTW I've managed to run PySpark with this fix on some reasonably large S3 data (multiple GB) and it was fine. It might happen only if records are large, or something like that. How much heap are you giving to your executors, and does it show that much in the web UI? Matei On Mar 29, 2014, at 10:44 PM, Jim Blomo jim.bl...@gmail.com wrote: I think the problem I ran into in 0.9 is covered in https://issues.apache.org/jira/browse/SPARK-1323 When I kill the python process, the stacktrace I gets indicates that this happens at initialization. It looks like the initial write to the Python process does not go through, and then the iterator hangs waiting for output. I haven't had luck turning on debugging for the executor process. Still trying
Re: NPE using saveAsTextFile
I haven’t seen this but it may be a bug in Typesafe Config, since this is serializing a Config object. We don’t actually use Typesafe Config ourselves. Do you have any nulls in the data itself by any chance? And do you know how that Config object is getting there? Matei On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Anyone have a chance to look at this? Am I just doing something silly somewhere? If it makes any difference, I am using the elasticsearch-hadoop plugin for ESInputFormat. But as I say, I can parse the data (count, first() etc). I just can't save it as text file. On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi I'm using Spark 0.9.0. When calling saveAsTextFile on a custom hadoop inputformat (loaded with newAPIHadoopRDD), I get the following error below. If I call count, I get the correct count of number of records, so the inputformat is being read correctly... the issue only appears when trying to use saveAsTextFile. If I call first() I get the correct output, also. So it doesn't appear to be anything with the data or inputformat. Any idea what the actual problem is, since this stack trace is not obvious (though it seems to be in ResultTask which ultimately causes this). Is this a known issue at all? == 14/04/08 16:00:46 ERROR OneForOneStrategy: java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at
Re: Spark - ready for prime time?
To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task. Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com wrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice. You'll want your customizations to go with (not against) the grain of the architecture. II. Is it mature enough? E.g. we've created a pull request which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable? There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity. For instance, we put all the jars on the main classpath so we've never run into the
Re: Spark 0.9.1 PySpark ImportError
Kind of strange because we haven’t updated CloudPickle AFAIK. Is this a package you added on the PYTHONPATH? How did you set the path, was it in conf/spark-env.sh? Matei On Apr 10, 2014, at 7:39 AM, aazout albert.az...@velos.io wrote: I am getting a python ImportError on Spark standalone cluster. I have set the PYTHONPATH on both worker and slave and the package imports properly when I run PySpark command line on both machines. This only happens with Master - Slave communication. Here is the error below: 14/04/10 13:40:19 INFO scheduler.TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /root/spark/python/pyspark/serializers.py, line 137, in _read_with_length return self.loads(obj) File /root/spark/python/pyspark/cloudpickle.py, line 810, in subimport __import__(name) ImportError: ('No module named volatility.atm_impl_vol', function subimport at 0xa36050, ('volatility.atm_impl_vol',)) Any ideas? - CEO / Velos (velos.io) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-PySpark-ImportError-tp4068.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark - ready for prime time?
It’s not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention “spill” in http://spark.incubator.apache.org/docs/latest/configuration.html. Matei On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matei, Where is the functionality in 0.9 to spill data within a task (separately from persist)? My apologies if this is something obvious but I don't see it in the api docs. -Suren On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task. Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com wrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated
Re: RDD.tail()
You can use mapPartitionsWithIndex and look at the partition index (0 will be the first partition) to decide whether to skip the first line. Matei On Apr 14, 2014, at 8:50 AM, Ethan Jewett esjew...@gmail.com wrote: We have similar needs but IIRC, I came to the conclusion that this would only work on ordered RDDs, and then you would still have to figure out which partition is the first one. I ended up deciding it would be best to just drop the header lines from a Scala iterator before creating an RDD based on it. Not sure if this was the right thing to do, but would that work for you? Regards, Ethan On Mon, Apr 14, 2014 at 10:24 AM, Philip Ogren philip.og...@oracle.com wrote: Has there been any thought to adding a tail() method to RDD? It would be really handy to skip over the first item in an RDD when it contains header information. Even better would be a drop(int) function that would allow you to skip over several lines of header information. Our attempts to do something equivalent with a filter() call seem a bit contorted. Any thoughts? Thanks, Philip
Re: process_local vs node_local
Spark can actually launch multiple executors on the same node if you configure it that way, but if you haven’t done that, this might mean that some tasks are reading data from the cache, and some from HDFS. (In the HDFS case Spark will only report it as NODE_LOCAL since HDFS isn’t tied to a particular executor process). For example, maybe you cached some data but not all the partitions of the RDD are in memory. Are you using caching here? There’s a locality wait setting in Spark (spark.locality.wait) that determines how long it will wait to go to the next locality level when it can’t launch stuff at its preferred one (e.g. to go from process to node). You can try increasing that too, by default it’s only 3000 ms. It might be that the whole RDD is cached but garbage collection causes it to give up waiting on some nodes and launch stuff on other nodes instead, which might be HDFS-local (due to data replication) but not cache-local. Matei On Apr 14, 2014, at 8:37 AM, dachuan hdc1...@gmail.com wrote: I am confused about the process local and node local, too. In my current understanding of Spark, one application typically only has one executor in one node. However, node local means your data is in the same host, but in a different executor. This further means node local is the same with process local unless one node has two executors, which could only happen when one node has two Workers. Waiting for further discussion .. On Mon, Apr 14, 2014 at 10:13 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I've a fairly large job (5E9 records, ~1600 partitions).wherein on a given stage, it looks like for the first half of the tasks, everything runs in process_local mode in ~10s/partition. Then, from halfway through, everything starts running in node_local mode, and takes 10x as long or more. I read somewhere that the difference between the two had to do with the data being local to the running jvm, or another jvm on the same machine. If that's the case, shouldn't the distribution of the two modes be more random? If not, what exactly is the difference between the two modes? Given how much longer it takes in node_local mode, it seems like the whole thing would probably run much faster just by waiting for the right jvm to be free. Is there any way of forcing this? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Dachuan Huang Cellphone: 614-390-7234 2015 Neil Avenue Ohio State University Columbus, Ohio U.S.A. 43210
Re: using Kryo with pyspark?
Kryo won’t make a major impact on PySpark because it just stores data as byte[] objects, which are fast to serialize even with Java. But it may be worth a try — you would just set spark.serializer and not try to register any classes. What might make more impact is storing data as MEMORY_ONLY_SER and turning on spark.rdd.compress, which will compress them. In Java this can add some CPU overhead but Python runs quite a bit slower so it might not matter, and it might speed stuff up by reducing GC or letting you cache more data. Matei On Apr 14, 2014, at 12:24 PM, Diana Carroll dcarr...@cloudera.com wrote: I'm looking at the Tuning Guide suggestion to use Kryo instead of default serialization. My questions: Does pyspark use Java serialization by default, as Scala spark does? If so, then... can I use Kryo with pyspark instead? The instructions say I should register my classes with the Kryo Serialization, but that's in Java/Scala. If I simply set the spark.serializer variable for my SparkContext, will it at least use Kryo for Spark's own classes, even if I can't register any of my own classes? Thanks, Diana
Re: partitioning of small data sets
Yup, one reason it’s 2 actually is to give people a similar experience to working with large files, in case their code doesn’t deal well with the file being partitioned. Matei On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote: Take a look at the minSplits argument for SparkContext#textFile [1] -- the default value is 2. You can simply set this to 1 if you'd prefer not to split your data. [1] http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.com wrote: I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: sparkdev_2014-04-11.png Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana
Re: Multi-tenant?
Yes, both things can happen. Take a look at http://spark.apache.org/docs/latest/job-scheduling.html, which includes scheduling concurrent jobs within the same driver. Matei On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote: What is the support for multi-tenancy in Spark. I assume more than one driver can share the same cluster, but can a driver run two jobs in parallel?
Re: PySpark still reading only text?
Hi Bertrand, We should probably add a SparkContext.pickleFile and RDD.saveAsPickleFile that will allow saving pickled objects. Unfortunately this is not in yet, but there is an issue up to track it: https://issues.apache.org/jira/browse/SPARK-1161. In 1.0, one feature we do have now is the ability to load binary data from Hive using Spark SQL’s Python API. Later we will also be able to save to Hive. Matei On Apr 16, 2014, at 4:27 AM, Bertrand Dechoux decho...@gmail.com wrote: Hi, I have browsed the online documentation and it is stated that PySpark only read text files as sources. Is it still the case? From what I understand, the RDD can after this first step be any serialized python structure if the class definitions are well distributed. Is it not possible to read back those RDDs? That is create a flow to parse everything and then, e.g. the next week, start from the binary, structured data? Technically, what is the difficulty? I would assume the code reading a binary python RDD or a binary python file to be quite similar. Where can I know more about this subject? Thanks in advance Bertrand
Re: extremely slow k-means version
The problem is that groupByKey means “bring all the points with this same key to the same JVM”. Your input is a Seq[Point], so you have to have all the points there. This means that a) all points will be sent across the network in a cluster, which is slow (and Spark goes through this sending code path even in local mode so it serializes the data), and b) you’ll get out of memory errors if that Seq is too big. In large-scale data processing, data movement is often the biggest cost, so you have to carefully choose which operations to use. Matei On Apr 19, 2014, at 4:04 AM, ticup tim.coppiete...@gmail.com wrote: Hi, I was playing around with other k-means implementations in Scala/Spark in order to test performances and get a better grasp on Spark. Now, I made one similar to the one from the examples (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala), except that it's a bit less clever. Nevertheless, I expect a non-expert scala/spark programmer to write similar code instead of that from the example. Here is how they compare: in the step of calculating the new centroids (this is done by taking the average of all points belonging to the current centroids - the main workhorse of the algo), where the *example algorithm* adds the points of the same cluster and keeps track of the number of points in each cluster in 1 step (by using reduceByKey and keeping a counter in the reduce value): val closest = data.map (p = (closestPoint(p, kPoints), (p, 1))) val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2, y1 + y2)} and then proceeds by dividing the sum of all points of a cluster by the counted number of points in the cluster: val newPoints = pointStats.map {pair = (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() Afterwards the change of the new centroids is calculated in order to know when to stop iterating: tempDist = 0.0 for (i - 0 until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } *my algorithm * (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala) is less clever, but more straightforward: it just groups all the points of each cluster and then proceeds to calculate the average on those points and adds the difference with the previous centroid to an accumulator: // accumulator for differences new centroids dist = sc.accumulator(0.0) // calculate new centroids + add difference to old centroids centroids = closest.groupByKey().map{case(i, points) = val newCentroid = average(points) dist += centroids(i).squaredDist(newCentroid) newCentroid }.collect() with: def average(points: Seq[Vector]) : Vector = { points.reduce(_+_) / points.length } So, the big differences are: 1. Use of accumulator 2. Do excessive work by not cleverly calculating the average 3. Accesses the centroids val from within the map Now, why I'm here for, this version runs EXTREMELY slow and gets outOfHeapMemory exceptions for data input that the original algorithm easily solves in ~5seconds. I'm trying to pinpoint what exactly is causing this huge difference. The use of an accumulator shouldn't really affect the performance and it doesn't, because I tried it without the accumulator and it stays as slow. Further, I expect the excessive work to slow down the algorithm with a factor of 2 or something, but this is really a decrease in factors of 10 or more. Even with 1 worker and 1 core (thus no parallelism) the difference in speed stays the same, so it's not because the averaging is not parallelised properly, there must be something going on that is much more important. Could someone give me pointers on what exactly is happening here? It can't be because I'm just accessing the centroids value from within the closure? Speed comparison: The *slow algorithm*: 44 seconds to perform the map 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at k-means.scala:114) finished in 43.909 s The *fast algorithm*: more or less the same operations (in 2 steps instead of 1) in 2.2 seconds 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at k-means.scala:84) finished in 2.090 s 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at k-means.scala:86) finished in 0.117 s Thanks in advance, Tim. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming source from Amazon Kinesis
There was a patch posted a few weeks ago (https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis as my data source. Looking at the list of supported Spark Streaming sources, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick View this message in context: Spark Streaming source from Amazon Kinesis Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: error in mllib lr example code
See http://people.csail.mit.edu/matei/spark-unified-docs/ for a more recent build of the docs; if you spot any problems in those, let us know. Matei On Apr 23, 2014, at 9:49 AM, Xiangrui Meng men...@gmail.com wrote: The doc is for 0.9.1. You are running a later snapshot, which added sparse vectors. Try LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(x = x.toDouble)). The examples are updated in the master branch. You can also check the examples there. -Xiangrui On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi mohitja...@gmail.com wrote: sorry...added a subject now On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi mohitja...@gmail.com wrote: I am trying to run the example linear regression code from http://spark.apache.org/docs/latest/mllib-guide.html But I am getting the following error...am I missing an import? code import org.apache.spark._ import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint object ModelLR { def main(args: Array[String]) { val sc = new SparkContext(args(0), SparkLR, System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass).toSeq) // Load and parse the data val data = sc.textFile(mllib/data/ridge-data/lpsa.data) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x = x.toDouble).toArray) } ...snip... } error - polymorphic expression cannot be instantiated to expected type; found : [U : Double]Array[U] required: org.apache.spark.mllib.linalg.Vector - polymorphic expression cannot be instantiated to expected type; found : [U : Double]Array[U] required: org.apache.spark.mllib.linalg.Vector
Re: How do I access the SPARK SQL
It’s currently in the master branch, on https://github.com/apache/spark. You can check that out from git, build it with sbt/sbt assembly, and then try it out. We’re also going to post some release candidates soon that will be pre-built. Matei On Apr 23, 2014, at 1:30 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello Team, I'm new to SPARK and just came across SPARK SQL, which appears to be interesting but not sure how I could get it. I know it's an Alpha version but not sure if its available for community yet. Many thanks. Raj.
Re: Deploying a python code on a spark EC2 cluster
Did you launch this using our EC2 scripts (http://spark.apache.org/docs/latest/ec2-scripts.html) or did you manually set up the daemons? My guess is that their hostnames are not being resolved properly on all nodes, so executor processes can’t connect back to your driver app. This error message indicates that: 14/04/24 09:00:49 WARN util.Utils: Your hostname, spark-node resolves to a loopback address: 127.0.0.1; using 10.74.149.251 instead (on interface eth0) 14/04/24 09:00:49 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address If you launch with your EC2 scripts, or don’t manually change the hostnames, this should not happen. Matei On Apr 24, 2014, at 11:36 AM, John King usedforprinting...@gmail.com wrote: Same problem. On Thu, Apr 24, 2014 at 10:54 AM, Shubhabrata mail2shu...@gmail.com wrote: Moreover it seems all the workers are registered and have sufficient memory (2.7GB where as I have asked for 512 MB). The UI also shows the jobs are running on the slaves. But on the termial it is still the same error Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Please see the screenshot. Thanks http://apache-spark-user-list.1001560.n3.nabble.com/file/n4761/33.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deploying-a-python-code-on-a-spark-EC2-cluster-tp4758p4761.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SparkPi performance-3 cluster standalone mode
The problem is that SparkPi uses Math.random(), which is a synchronized method, so it can’t scale to multiple cores. In fact it will be slower on multiple cores due to lock contention. Try another example and you’ll see better scaling. I think we’ll have to update SparkPi to create a new Random in each task to avoid this. Matei On Apr 24, 2014, at 4:43 AM, Adnan nsyaq...@gmail.com wrote: Hi, Relatively new on spark and have tried running SparkPi example on a standalone 12 core three machine cluster. What I'm failing to understand is, that running this example with a single slice gives better performance as compared to using 12 slices. Same was the case when I was using parallelize function. The time is scaling almost linearly with adding each slice. Please let me know if I'm doing anything wrong. The code snippet is given below: Regards, Ahsan Ijaz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkPi-performance-3-cluster-standalone-mode-tp4530p4751.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Finding bad data
Hey Jim, this is unfortunately harder than I’d like right now, but here’s how to do it. Look at the stderr file of the executor on that machine, and you’ll see lines like this: 14/04/24 19:17:24 INFO HadoopRDD: Input split: file:/Users/matei/workspace/apache-spark/README.md:0+2000 This says what file it was reading, as well as what byte offset (that’s the 0+2000 part). Unfortunately, because the executor is running multiple tasks at the same time, this message will be hard to associate with a particular task unless you only configure one core per executor. But it may help you spot the file. The other way you might do it is a map() on the data before you process it that checks for error conditions. In that one you could print out the original input line. I realize that neither of these is ideal. I’ve opened https://issues.apache.org/jira/browse/SPARK-1622 to try to expose this information somewhere else, ideally in the UI. The reason it wasn’t done so far is because some tasks in Spark can be reading from multiple Hadoop InputSplits (e.g. if you use coalesce(), or zip(), or similar), so it’s tough to do it in a super general setting. Matei On Apr 24, 2014, at 6:15 PM, Jim Blomo jim.bl...@gmail.com wrote: I'm using PySpark to load some data and getting an error while parsing it. Is it possible to find the source file and line of the bad data? I imagine that this would be extremely tricky when dealing with multiple derived RRDs, so an answer with the caveat of this only works when running .map() on an textFile() RDD is totally fine. Perhaps if the line number and file was available in pyspark I could catch the exception and output it with the context? Anyway to narrow down the problem input would be great. Thanks!
Re: parallelize for a large Seq is extreamly slow.
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster. Matei On Apr 24, 2014, at 8:01 PM, Earthson Lu earthson...@gmail.com wrote: spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) this line is too slow. There are about 2 million elements in word_mapping. Is there a good style for writing a large collection to hdfs? import org.apache.spark._ import SparkContext._ import scala.io.Source object WFilter { def main(args: Array[String]) { val spark = new SparkContext(yarn-standalone,word filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass)) val stopset = Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).countByKey val df_map = spark broadcast file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } many thx:) -- ~ Perfection is achieved not when there is nothing more to add but when there is nothing left to take away
Re: Spark on Yarn or Mesos?
From my point of view, both are supported equally. The YARN support is newer and that’s why there’s been a lot more action there in recent months. Matei On Apr 27, 2014, at 12:08 PM, Andrew Ash and...@andrewash.com wrote: That thread was mostly about benchmarking YARN vs standalone, and the results are what I'd expect -- spinning up a Spark cluster on demand through YARN has higher startup latency than using a standalone cluster, where the JVMs are already initialized and ready. Given that there's a lot more commit activity around YARN as compared to Mesos, does that mean that YARN integration is just earlier in the maturity curve, or does it mean that YARN is the future and Mesos is in maintenance-only mode? That may be more a question for the Databricks team though: will YARN and Mesos be supported equally, or will one become the preferred method of doing cluster management under Spark? Andrew On Thu, Apr 17, 2014 at 1:27 PM, Arpit Tak arpi...@sigmoidanalytics.com wrote: Hi Wel, Take a look at this post... http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html Regards, Arpit Tak On Thu, Apr 17, 2014 at 3:42 PM, Wei Wang xwd0...@gmail.com wrote: Hi, there I would like to know is there any differences between Spark on Yarn and Spark on Mesos. Is there any comparision between them? What are the advantages and disadvantages for each of them. Is there any criterion for choosing between Yarn and Mesos? BTW, we need MPI in our framework, and I saw MPICH2 is included in Mesos. Should it be the reason for choosing Mesos? Thanks a lot! Weida
Re: Running a spark-submit compatible app in spark-shell
Hi Roger, You should be able to use the --jars argument of spark-shell to add JARs onto the classpath and then work with those classes in the shell. (A recent patch, https://github.com/apache/spark/pull/542, made spark-shell use the same command-line arguments as spark-submit). But this is a great question, we should test it out and see whether anything else would make development easier. SBT also has an interactive shell where you can run classes in your project, but unfortunately Spark can’t deal with closures typed directly in that the right way. However you write your Spark logic in a method and just call that method from the SBT shell, that should work. Matei On Apr 27, 2014, at 3:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, From the meetup talk about the 1.0 release, I saw that spark-submit will be the preferred way to launch apps going forward. How do you recommend launching such jobs in a development cycle? For example, how can I load an app that's expecting to a given to spark-submit into spark-shell? Also, can anyone recommend other tricks for rapid development? I'm new to Scala, sbt, etc. I think sbt can watch for changes in source files and compile them automatically. I want to be able to make code changes and quickly get into a spark-shell to play around with them. I appreciate any advice. Thanks, Roger
Re: Running out of memory Naive Bayes
Not sure if this is always ideal for Naive Bayes, but you could also hash the features into a lower-dimensional space (e.g. reduce it to 50,000 features). For each feature simply take MurmurHash3(featureID) % 5 for example. Matei On Apr 27, 2014, at 11:24 PM, DB Tsai dbt...@stanford.edu wrote: Our customer asked us to implement Naive Bayes which should be able to at least train news20 one year ago, and we implemented for them in Hadoop using distributed cache to store the model. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, Apr 27, 2014 at 11:03 PM, Xiangrui Meng men...@gmail.com wrote: How big is your problem and how many labels? -Xiangrui On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We also run into this issue at Alpine Data Labs. We ended up using LRU cache to store the counts, and splitting those least used counts to distributed cache in HDFS. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng men...@gmail.com wrote: Even the features are sparse, the conditional probabilities are stored in a dense matrix. With 200 labels and 2 million features, you need to store at least 4e8 doubles on the driver node. With multiple partitions, you may need more memory on the driver. Could you try reducing the number of partitions and giving driver more ram and see whether it can help? -Xiangrui On Sun, Apr 27, 2014 at 3:33 PM, John King usedforprinting...@gmail.com wrote: I'm already using the SparseVector class. ~200 labels On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng men...@gmail.com wrote: How many labels does your dataset have? -Xiangrui On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai dbt...@stanford.edu wrote: Which version of mllib are you using? For Spark 1.0, mllib will support sparse feature vector which will improve performance a lot when computing the distance between points and centroid. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sat, Apr 26, 2014 at 5:49 AM, John King usedforprinting...@gmail.com wrote: I'm just wondering are the SparkVector calculations really taking into account the sparsity or just converting to dense? On Fri, Apr 25, 2014 at 10:06 PM, John King usedforprinting...@gmail.com wrote: I've been trying to use the Naive Bayes classifier. Each example in the dataset is about 2 million features, only about 20-50 of which are non-zero, so the vectors are very sparse. I keep running out of memory though, even for about 1000 examples on 30gb RAM while the entire dataset is 4 million examples. And I would also like to note that I'm using the sparse vector class.
Re: K-means with large K
Try turning on the Kryo serializer as described at http://spark.apache.org/docs/latest/tuning.html. Also, are there any exceptions in the driver program’s log before this happens? Matei On Apr 28, 2014, at 9:19 AM, Buttler, David buttl...@llnl.gov wrote: Hi, I am trying to run the K-means code in mllib, and it works very nicely with small K (less than 1000). However, when I try for a larger K (I am looking for 2000-4000 clusters), it seems like the code gets part way through (perhaps just the initialization step) and freezes. The compute nodes stop doing any CPU / network / IO and nothing happens for hours. I had done something similar back in the days of Spark 0.6, and I didn’t have any trouble going up to 4000 clusters with similar data. This happens with both a standalone cluster, and in local multi-core mode (with the node given 200GB of heap), but eventually completes in local single-core mode. Data statistics: Rows: 166248 Columns: 108 This is a test run before trying it out on much larger data Any ideas on what might be the cause of this? Thanks, Dave
Re: processing s3n:// files in parallel
Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do comma-separated lists (e.g. s3n://file1,s3n://file2). These are all inherited from FileInputFormat in Hadoop. Matei On Apr 28, 2014, at 6:05 PM, Andrew Ash and...@andrewash.com wrote: This is already possible with the sc.textFile(/path/to/filedir/*) call. Does that work for you? Sent from my mobile phone On Apr 29, 2014 2:46 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: It would be useful to have some way to open multiple files at once into a single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be equivalent to opening a single file which is made by concatenating the various files together. This would only be useful, of course, if the source files were all in the same format. Nick On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash and...@andrewash.com wrote: The way you've written it there, I would expect that to be serial runs. The reason is, you're looping over matches with a driver-level map, which is serialized. Then calling foreachWith on the RDDs executes the action in a blocking way, so you don't get a result back until the cluster finishes. You can have multiple jobs running at the same time though by sharing a SparkContext among threads. Rather than run all the foreachWith()s in serial on a single thread in the driver, try running each in its own thread. On Tue, Apr 29, 2014 at 1:35 AM, Art Peel found...@gmail.com wrote: I’m trying to process 3 S3 files in parallel, but they always get processed serially. Am I going about this the wrong way? Full details below. Regards, Art I’m trying to do the following on a Spark cluster with 3 slave nodes. Given a list of N URLS for files in S3 (with s3n:// urls), For each file: 1. search for some items 2. record the findings from step 1 in an external data store. I’d like to process the 3 URLs in parallel on 3 different slave nodes, but instead, they are getting processed serially on one node. I’ve tried various versions of my code to no avail. I’m also creating the SparkContext with spark.scheduler.mode, FAIR”. Have I made a fundamental mistake? I’m running Spark 0.9.1 and my code looks roughly like this: def processFiles(sc: SparkContext) { val urls = List(s3n://foo/file.txt, s3n://bar/file.txt, s3n://baz/file.txt) val hadoopFiles = urls.map(url = { sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], classOf[WritableFooRecord]) }) val matches = hadoopFiles.par.map((hadoopFile) = { findMatches(hadoopFile) }) matches.map((matchRDD) = { recordMatches(matchRDD) }) } def findMatches(hadoopFile: RDD): RDD = { hadoopFile.map( record = caseClassResultFromSearch(record) ) } def recordMatches(matchRDD: RDD) { matchRDD.foreachWith(_ = { makeRestClient(config) // I get 3 jobs referring to the line number of the next line, but the jobs run serially on one node. })((matchRecord, client) = { client.storeResult(matchRecord) } }
Re: Python Spark on YARN
This will be possible in 1.0 after this pull request: https://github.com/apache/spark/pull/30 Matei On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote: Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com/ ===
Re: performance improvement on second operation...without caching?
Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\ .map(lambda (model,count): (count,model))\ .sortByKey(ascending=False) # display the top 10 models for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count) # repeat! for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count)
Re: performance improvement on second operation...without caching?
Yes, this happens as long as you use the same RDD. For example say you do the following: data1 = sc.textFile(…).map(…).reduceByKey(…) data1.count() data1.filter(…).count() The first count() causes outputs of the map/reduce pair in there to be written out to shuffle files. Next time you do a count, on either this RDD or a child (e.g. after the filter), we notice that output files were already generated for this shuffle so we don’t rerun the map stage. Note that the output does get read again over the network, which is kind of wasteful (if you really wanted to reuse this as quickly as possible you’d use cache()). Matei On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote: Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that? On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\ .map
Re: Spark GCE Script
Very cool! Have you thought about sending this as a pull request? We’d be happy to maintain it inside Spark, though it might be interesting to find a single Python package that can manage clusters across both EC2 and GCE. Matei On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Sparkers, We have created a quick spark_gce script which can launch a spark cluster in the Google Cloud. I'm sharing it because it might be helpful for someone using the Google Cloud for deployment rather than AWS. Here's the link to the script https://github.com/sigmoidanalytics/spark_gce Feel free to use it and suggest any feedback around it. In short here's what it does: Just like the spark_ec2 script, this one also reads certain command-line arguments (See the github page for more details) like the cluster name and all, then starts the machines in the google cloud, sets up the network, adds a 500GB empty disk to all machines, generate the ssh keys on master and transfer it to all slaves and install java and downloads and configures Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently the version is 0.9.1 but I'm happy to add/support more versions if anyone is interested. Cheers. Thanks Best Regards
Re: Increase Stack Size Workers
Add export SPARK_JAVA_OPTS=“-Xss16m” to conf/spark-env.sh. Then it should apply to the executor. Matei On May 5, 2014, at 2:20 PM, Andrea Esposito and1...@gmail.com wrote: Hi there, i'm doing an iterative algorithm and sometimes i ended up with StackOverflowError, doesn't matter if i do checkpoints or not. Remaining i don't understand why this is happening, i figure out that increasing the stack size is a workaround. Developing using local[n] so the local mode i can set the stack size through the -Xss parameter. How can i do the same for the standalone mode for each worker? Setting it as java -Xss16m Worker seems useless because the actual computation are done on CoarseGrainExecutor.. Best, EA
Re: Spark and Java 8
Java 8 support is a feature in Spark, but vendors need to decide for themselves when they’d like support Java 8 commercially. You can still run Spark on Java 7 or 6 without taking advantage of the new features (indeed our builds are always against Java 6). Matei On May 6, 2014, at 8:59 AM, Ian O'Connell i...@ianoconnell.com wrote: I think the distinction there might be they never said they ran that code under CDH5, just that spark supports it and spark runs under CDH5. Not that you can use these features while running under CDH5. They could use mesos or the standalone scheduler to run them On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.com wrote: Hi I just read an article [1] about Spark, CDH5 and Java 8 but did not get exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark using a separate JVM that run on data nodes or is it reusing the YARN JVM runtime somehow, like hadoop1? CDH5 only supports Java 7 [2] as far as I know? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/ [2] http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html
Re: Spark to utilize HDFS's mmap caching
Yes, Spark goes through the standard HDFS client and will automatically benefit from this. Matei On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via sc.textFile() and other HDFS-related APIs? http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
Re: Is their a way to Create SparkContext object?
You can just pass it around as a parameter. On May 12, 2014, at 12:37 PM, yh18190 yh18...@gmail.com wrote: Hi, Could anyone suggest an idea how can we create sparkContext object in other classes or fucntions where we need to convert a scala collection to RDD using sc object.like sc.makeRDD(list).instead of using Main class sparkcontext object? is their a way to pass sc object as a parameter to function in other classes? Please let me know -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pySpark memory usage
Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)). Matei On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Aaron, this looks like a good solution! Will be trying it out shortly. I noticed that the S3 exception seem to occur more frequently when the box is swapping. Why is the box swapping? combineByKey seems to make the assumption that it can fit an entire partition in memory when doing the combineLocally step. I'm going to try to break this apart but will need some sort of heuristic options include looking at memory usage via the resource module and trying to keep below 'spark.executor.memory', or using batchSize to limit the number of entries in the dictionary. Let me know if you have any opinions. On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote: I'd just like to update this thread by pointing to the PR based on our initial design: https://github.com/apache/spark/pull/640 This solution is a little more general and avoids catching IOException altogether. Long live exception propagation! On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, This IOException thing is a general issue that we need to fix and your observation is spot-in. There is actually a JIRA for it here I created a few days ago: https://issues.apache.org/jira/browse/SPARK-1579 Aaron is assigned on that one but not actively working on it, so we'd welcome a PR from you on this if you are interested. The first thought we had was to set a volatile flag when the reader sees an exception (indicating there was a failure in the task) and avoid swallowing the IOException in the writer if this happens. But I think there is a race here where the writer sees the error first before the reader knows what is going on. Anyways maybe if you have a simpler solution you could sketch it out in the JIRA and we could talk over there. The current proposal in the JIRA is somewhat complicated... - Patrick On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote: FYI, it looks like this stdin writer to Python finished early error was caused by a break in the connection to S3, from which the data was being pulled. A recent commit to PythonRDD noted that the current exception catching can potentially mask an exception for the data source, and that is indeed what I see happening. The underlying libraries (jets3t and httpclient) do have retry capabilities, but I don't see a great way of setting them through Spark code. Instead I added the patch below which kills the worker on the exception. This allows me to completely load the data source after a few worker retries. Unfortunately, java.net.SocketException is the same error that is sometimes expected from the client when using methods like take(). One approach around this conflation is to create a new locally scoped exception class, eg. WriterException, catch java.net.SocketException during output writing, then re-throw the new exception. The worker thread could then distinguish between the reasons java.net.SocketException might be thrown. Perhaps there is a more elegant way to do this in Scala, though? Let me know if I should open a ticket or discuss this on the developers list instead. Best, Jim diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d71fdb..f31158c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( readerException = e Try(worker.shutdownOutput()) // kill Python worker process + case e: java.net.SocketException = + // This can happen if a connection to the datasource, eg S3, resets + // or is otherwise broken +readerException = e +Try(worker.shutdownOutput()) // kill Python worker process + case e: IOException = // This can happen for legitimate reasons if the Python code stops returning data // before we are done passing elements through, e.g., for take(). Just log a message to On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote: This dataset is uncompressed text at ~54GB. stats() returns (count: 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: 343) On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Okay, thanks
Test
Re: pySpark memory usage
Cool, that’s good to hear. We’d also like to add spilling in Python itself, or at least make it exit with a good message if it can’t do it. Matei On May 14, 2014, at 10:47 AM, Jim Blomo jim.bl...@gmail.com wrote: That worked amazingly well, thank you Matei! Numbers that worked for me were 400 for the textFile()s, 1500 for the join()s. On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)). Matei On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Aaron, this looks like a good solution! Will be trying it out shortly. I noticed that the S3 exception seem to occur more frequently when the box is swapping. Why is the box swapping? combineByKey seems to make the assumption that it can fit an entire partition in memory when doing the combineLocally step. I'm going to try to break this apart but will need some sort of heuristic options include looking at memory usage via the resource module and trying to keep below 'spark.executor.memory', or using batchSize to limit the number of entries in the dictionary. Let me know if you have any opinions. On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote: I'd just like to update this thread by pointing to the PR based on our initial design: https://github.com/apache/spark/pull/640 This solution is a little more general and avoids catching IOException altogether. Long live exception propagation! On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, This IOException thing is a general issue that we need to fix and your observation is spot-in. There is actually a JIRA for it here I created a few days ago: https://issues.apache.org/jira/browse/SPARK-1579 Aaron is assigned on that one but not actively working on it, so we'd welcome a PR from you on this if you are interested. The first thought we had was to set a volatile flag when the reader sees an exception (indicating there was a failure in the task) and avoid swallowing the IOException in the writer if this happens. But I think there is a race here where the writer sees the error first before the reader knows what is going on. Anyways maybe if you have a simpler solution you could sketch it out in the JIRA and we could talk over there. The current proposal in the JIRA is somewhat complicated... - Patrick On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote: FYI, it looks like this stdin writer to Python finished early error was caused by a break in the connection to S3, from which the data was being pulled. A recent commit to PythonRDD noted that the current exception catching can potentially mask an exception for the data source, and that is indeed what I see happening. The underlying libraries (jets3t and httpclient) do have retry capabilities, but I don't see a great way of setting them through Spark code. Instead I added the patch below which kills the worker on the exception. This allows me to completely load the data source after a few worker retries. Unfortunately, java.net.SocketException is the same error that is sometimes expected from the client when using methods like take(). One approach around this conflation is to create a new locally scoped exception class, eg. WriterException, catch java.net.SocketException during output writing, then re-throw the new exception. The worker thread could then distinguish between the reasons java.net.SocketException might be thrown. Perhaps there is a more elegant way to do this in Scala, though? Let me know if I should open a ticket or discuss this on the developers list instead. Best, Jim diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d71fdb..f31158c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( readerException = e Try(worker.shutdownOutput()) // kill Python worker process + case e: java.net.SocketException = + // This can happen if a connection to the datasource, eg S3, resets + // or is otherwise broken +readerException = e +Try(worker.shutdownOutput()) // kill Python worker process + case e: IOException = // This can happen for legitimate reasons if the Python
Re: persist @ disk-only failing
This is the patch for it: https://github.com/apache/spark/pull/50/. It might be possible to backport it to 0.8. Matei On May 19, 2014, at 2:04 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Matei, I am using 0.8.1 !! But is there a way without moving to 0.9.1 to bypass cache ? On Mon, May 19, 2014 at 1:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: What version is this with? We used to build each partition first before writing it out, but this was fixed a while back (0.9.1, but it may also be in 0.9.0). Matei On May 19, 2014, at 12:41 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Hi all, When i gave the persist level as DISK_ONLY, still Spark tries to use memory and caches. Any reason ? Do i need to override some parameter elsewhere ? Thanks !
Re: How to compile the examples directory?
If you’d like to work on just this code for your own changes, it might be best to copy it to a separate project. Look at http://spark.apache.org/docs/latest/quick-start.html for how to set up a standalone job. Matei On May 19, 2014, at 4:53 AM, Hao Wang wh.s...@gmail.com wrote: Hi, I am running some examples of Spark on a cluster. Because I need to modify some source code, I have to re-compile the whole Spark using `sbt/sbt assembly`, which takes a long time. I have tried `mvn package` under the example directory, it failed because of some dependencies problem. Any way to avoid to compile the whole Spark project? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: advice on maintaining a production spark cluster?
Which version is this with? I haven’t seen standalone masters lose workers. Is there other stuff on the machines that’s killing them, or what errors do you see? Matei On May 16, 2014, at 9:53 AM, Josh Marcus jmar...@meetup.com wrote: Hey folks, I'm wondering what strategies other folks are using for maintaining and monitoring the stability of stand-alone spark clusters. Our master very regularly loses workers, and they (as expected) never rejoin the cluster. This is the same behavior I've seen using akka cluster (if that's what spark is using in stand-alone mode) -- are there configuration options we could be setting to make the cluster more robust? We have a custom script which monitors the number of workers (through the web interface) and restarts the cluster when necessary, as well as resolving other issues we face (like spark shells left open permanently claiming resources), and it works, but it's no where close to a great solution. What are other folks doing? Is this something that other folks observe as well? I suspect that the loss of workers is tied to jobs that run out of memory on the client side or our use of very large broadcast variables, but I don't have an isolated test case. I'm open to general answers here: for example, perhaps we should simply be using mesos or yarn instead of stand-alone mode. --j
Re: life if an executor
They’re tied to the SparkContext (application) that launched them. Matei On May 19, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote: from looking at the source code i see executors run in their own jvm subprocesses. how long to they live for? as long as the worker/slave? or are they tied to the sparkcontext and life/die with it? thx
Re: advice on maintaining a production spark cluster?
Are you guys both using Cloudera Manager? Maybe there’s also an issue with the integration with that. Matei On May 20, 2014, at 11:44 AM, Aaron Davidson ilike...@gmail.com wrote: I'd just like to point out that, along with Matei, I have not seen workers drop even under the most exotic job failures. We're running pretty close to master, though; perhaps it is related to an uncaught exception in the Worker from a prior version of Spark. On Tue, May 20, 2014 at 11:36 AM, Arun Ahuja aahuj...@gmail.com wrote: Hi Matei, Unfortunately, I don't have more detailed information, but we have seen the loss of workers in standalone mode as well. If a job is killed through CTRL-C we will often see in the Spark Master page the number of workers and cores decrease. They are still alive and well in the Cloudera Manager page, but not visible on the Spark master, simply restarting the workers usually resolves this, but we often seen workers disappear after a failed or killed job. If we see this occur again, I'll try and provide some logs. On Mon, May 19, 2014 at 10:51 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Which version is this with? I haven’t seen standalone masters lose workers. Is there other stuff on the machines that’s killing them, or what errors do you see? Matei On May 16, 2014, at 9:53 AM, Josh Marcus jmar...@meetup.com wrote: Hey folks, I'm wondering what strategies other folks are using for maintaining and monitoring the stability of stand-alone spark clusters. Our master very regularly loses workers, and they (as expected) never rejoin the cluster. This is the same behavior I've seen using akka cluster (if that's what spark is using in stand-alone mode) -- are there configuration options we could be setting to make the cluster more robust? We have a custom script which monitors the number of workers (through the web interface) and restarts the cluster when necessary, as well as resolving other issues we face (like spark shells left open permanently claiming resources), and it works, but it's no where close to a great solution. What are other folks doing? Is this something that other folks observe as well? I suspect that the loss of workers is tied to jobs that run out of memory on the client side or our use of very large broadcast variables, but I don't have an isolated test case. I'm open to general answers here: for example, perhaps we should simply be using mesos or yarn instead of stand-alone mode. --j
Re: Python, Spark and HBase
Unfortunately this is not yet possible. There’s a patch in progress posted here though: https://github.com/apache/spark/pull/455 — it would be great to get your feedback on it. Matei On May 20, 2014, at 4:21 PM, twizansk twiza...@gmail.com wrote: Hello, This seems like a basic question but I have been unable to find an answer in the archives or other online sources. I would like to know if there is any way to load a RDD from HBase in python. In Java/Scala I can do this by initializing a NewAPIHadoopRDD with a TableInputFormat class. Is there any equivalent in python? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python, Spark and HBase
It sounds like you made a typo in the code — perhaps you’re trying to call self._jvm.PythonRDDnewAPIHadoopFile instead of self._jvm.PythonRDD.newAPIHadoopFile? There should be a dot before the new. Matei On May 28, 2014, at 5:25 PM, twizansk twiza...@gmail.com wrote: Hi Nick, I finally got around to downloading and building the patch. I pulled the code from https://github.com/MLnick/spark-1/tree/pyspark-inputformats I am running on a CDH5 node. While the code in the CDH branch is different from spark master, I do believe that I have resolved any inconsistencies. When attempting to connect to an HBase table using SparkContext.newAPIHadoopFile I receive the following error: Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM I have searched the pyspark-inputformats branch and cannot find any reference to the class org.apache.spark.api.python.PythonRDDnewAPIHadoopFile Any ideas? Also, do you have a working example of HBase access with the new code? Thanks Tommer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6502.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Checking spark cache percentage programatically. And how to clear cache.
You can remove cached RDDs by calling unpersist() on them. You can also use SparkContext.getRDDStorageInfo to get info on cache usage, though this is a developer API so it may change in future versions. We will add a standard API eventually but this is just very closely tied to framework internals. Matei On May 28, 2014, at 5:32 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hi, Is there a programmatic way of checking whether RDD has been 100% cached or not? I'd like to do this to have two different path ways. Additionally, how do you clear cache (e.g. if you want to cache different RDDs, and you'd like to clear an existing cached RDD). Thanks!
Re: Spark hook to create external process
Hi Anand, This is probably already handled by the RDD.pipe() operation. It will spawn a process and let you feed data to it through its stdin and read data through stdout. Matei On May 29, 2014, at 9:39 AM, ansriniv ansri...@gmail.com wrote: I have a requirement where for every Spark executor threadpool thread, I need to launch an associated external process. My job will consist of some processing in the Spark executor thread and some processing by its associated external process with the 2 communicating via some IPC mechanism. Is there a hook in Spark where I can put in my code to create / destroy these external processes corresponding to the creation / destruction of executor thread pool threads. Thanks Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Driver OOM while using reduceByKey
That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks). Matei On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote: Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China
Re: Why Scala?
Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 (http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick View this message in context: Why Scala? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Shuffle file consolidation
It can be set in an individual application. Consolidation had some issues on ext3 as mentioned there, though we might enable it by default in the future because other optimizations now made it perform on par with the non-consolidation version. It also had some bugs in 0.9.0 so I’d suggest at least 0.9.1. Matei On May 29, 2014, at 2:21 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Thanks, I missed that. One thing that's still unclear to me, even looking at that, is - does this parameter have to be set when starting up the cluster, on each of the workers, or can it be set by an individual client job? On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote: Hi Nathan, There's some explanation in the spark configuration section: ``` If set to true, consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to true when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (8) cores due to filesystem limitations. ``` 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: In trying to sort some largish datasets, we came across the spark.shuffle.consolidateFiles property, and I found in the source code that it is set, by default, to false, with a note to default it to true when the feature is stable. Does anyone know what is unstable about this? If we set it true, what problems should we anticipate? Thanks, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- JU Han Data Engineer @ Botify.com +33 061960 -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Trouble with EC2
What instance types did you launch on? Sometimes you also get a bad individual machine from EC2. It might help to remove the node it’s complaining about from the conf/slaves file. Matei On May 30, 2014, at 11:18 AM, PJ$ p...@chickenandwaffl.es wrote: Hey Folks, I'm really having quite a bit of trouble getting spark running on ec2. I'm not using scripts the https://github.com/apache/spark/tree/master/ec2 because I'd like to know how everything works. But I'm going a little crazy. I think that something about the networking configuration must be messed up, but I'm at a loss. Shortly after starting the cluster, I get a lot of this: 14/05/30 18:03:22 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:22 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:23 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:23 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.100.75.70%3A36725-25#847210246] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485 ] 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485 ] 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
Re: Yay for 1.0.0! EC2 Still has problems.
More specifically with the -a flag, you *can* set your own AMI, but you’ll need to base it off ours. This is because spark-ec2 assumes that some packages (e.g. java, Python 2.6) are already available on the AMI. Matei On Jun 1, 2014, at 11:01 AM, Patrick Wendell pwend...@gmail.com wrote: Hey just to clarify this - my understanding is that the poster (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally launch spark-ec2 from my laptop. And he was looking for an AMI that had a high enough version of python. Spark-ec2 itself has a flag -a that allows you to give a specific AMI. This flag is just an internal tool that we use for testing when we spin new AMI's. Users can't set that to an arbitrary AMI because we tightly control things like the Java and OS versions, libraries, etc. On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: *sigh* OK, I figured it out. (Thank you Nick, for the hint) m1.large works, (I swear I tested that earlier and had similar issues... ) It was my obsession with starting r3.*large instances. Clearly I hadn't patched the script in all the places.. which I think caused it to default to the Amazon AMI. I'll have to take a closer look at the code and see if I can't fix it correctly, because I really, really do want nodes with 2x the CPU and 4x the memory for the same low spot price. :-) I've got a cluster up now, at least. Time for the fun stuff... Thanks everyone for the help! On Sun, Jun 1, 2014 at 5:19 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: If you are explicitly specifying the AMI in your invocation of spark-ec2, may I suggest simply removing any explicit mention of AMI from your invocation? spark-ec2 automatically selects an appropriate AMI based on the specified instance type. 2014년 6월 1일 일요일, Nicholas Chammasnicholas.cham...@gmail.com님이 작성한 메시지: Could you post how exactly you are invoking spark-ec2? And are you having trouble just with r3 instances, or with any instance type? 2014년 6월 1일 일요일, Jeremy Leeunorthodox.engine...@gmail.com님이 작성한 메시지: It's been another day of spinning up dead clusters... I thought I'd finally worked out what everyone else knew - don't use the default AMI - but I've now run through all of the official quick-start linux releases and I'm none the wiser: Amazon Linux AMI 2014.03.1 - ami-7aba833f (64-bit) Provisions servers, connects, installs, but the webserver on the master will not start Red Hat Enterprise Linux 6.5 (HVM) - ami-5cdce419 Spot instance requests are not supported for this AMI. SuSE Linux Enterprise Server 11 sp3 (HVM) - ami-1a88bb5f Not tested - costs 10x more for spot instances, not economically viable. Ubuntu Server 14.04 LTS (HVM) - ami-f64f77b3 Provisions servers, but git is not pre-installed, so the cluster setup fails. Amazon Linux AMI (HVM) 2014.03.1 - ami-5aba831f Provisions servers, but git is not pre-installed, so the cluster setup fails. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Trouble with EC2
So to run spark-ec2, you should use the default AMI that it launches with if you don’t pass -a. Those are based on Amazon Linux, not Debian. Passing your own AMI is an advanced option but people need to install some stuff on their AMI in advance for it to work with our scripts. Matei On Jun 1, 2014, at 3:11 PM, PJ$ p...@chickenandwaffl.es wrote: Running on a few m3.larges with the ami-848a6eec image (debian 7). Haven't gotten any further. No clue what's wrong. I'd really appreciate any guidance y'all could offer. Best, PJ$ On Sat, May 31, 2014 at 1:40 PM, Matei Zaharia matei.zaha...@gmail.com wrote: What instance types did you launch on? Sometimes you also get a bad individual machine from EC2. It might help to remove the node it’s complaining about from the conf/slaves file. Matei On May 30, 2014, at 11:18 AM, PJ$ p...@chickenandwaffl.es wrote: Hey Folks, I'm really having quite a bit of trouble getting spark running on ec2. I'm not using scripts the https://github.com/apache/spark/tree/master/ec2 because I'd like to know how everything works. But I'm going a little crazy. I think that something about the networking configuration must be messed up, but I'm at a loss. Shortly after starting the cluster, I get a lot of this: 14/05/30 18:03:22 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:22 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:23 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:03:23 INFO master.Master: Registering worker ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.100.75.70%3A36725-25#847210246] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485 ] 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485 ] 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 INFO master.Master: akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, removing it. 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] - [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
Re: Yay for 1.0.0! EC2 Still has problems.
FYI, I opened https://issues.apache.org/jira/browse/SPARK-1990 to track this. Matei On Jun 1, 2014, at 6:14 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Sort of.. there were two separate issues, but both related to AWS.. I've sorted the confusion about the Master/Worker AMI ... use the version chosen by the scripts. (and use the right instance type so the script can choose wisely) But yes, one also needs a launch machine to kick off the cluster, and for that I _also_ was using an Amazon instance... (made sense.. I have a team that will needs to do things as well, not just me) and I was just pointing out that if you use the most recommended by Amazon AMI (for your free micro instance, for example) you get python 2.6 and the ec2 scripts fail. That merely needs a line in the documentation saying use Ubuntu for your cluster controller, not Amazon Linux or somesuch. But yeah, for a newbie, it was hard working out when to use default or custom AMIs for various parts of the setup. On Mon, Jun 2, 2014 at 4:01 AM, Patrick Wendell pwend...@gmail.com wrote: Hey just to clarify this - my understanding is that the poster (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally launch spark-ec2 from my laptop. And he was looking for an AMI that had a high enough version of python. Spark-ec2 itself has a flag -a that allows you to give a specific AMI. This flag is just an internal tool that we use for testing when we spin new AMI's. Users can't set that to an arbitrary AMI because we tightly control things like the Java and OS versions, libraries, etc. On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: *sigh* OK, I figured it out. (Thank you Nick, for the hint) m1.large works, (I swear I tested that earlier and had similar issues... ) It was my obsession with starting r3.*large instances. Clearly I hadn't patched the script in all the places.. which I think caused it to default to the Amazon AMI. I'll have to take a closer look at the code and see if I can't fix it correctly, because I really, really do want nodes with 2x the CPU and 4x the memory for the same low spot price. :-) I've got a cluster up now, at least. Time for the fun stuff... Thanks everyone for the help! On Sun, Jun 1, 2014 at 5:19 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: If you are explicitly specifying the AMI in your invocation of spark-ec2, may I suggest simply removing any explicit mention of AMI from your invocation? spark-ec2 automatically selects an appropriate AMI based on the specified instance type. 2014년 6월 1일 일요일, Nicholas Chammasnicholas.cham...@gmail.com님이 작성한 메시지: Could you post how exactly you are invoking spark-ec2? And are you having trouble just with r3 instances, or with any instance type? 2014년 6월 1일 일요일, Jeremy Leeunorthodox.engine...@gmail.com님이 작성한 메시지: It's been another day of spinning up dead clusters... I thought I'd finally worked out what everyone else knew - don't use the default AMI - but I've now run through all of the official quick-start linux releases and I'm none the wiser: Amazon Linux AMI 2014.03.1 - ami-7aba833f (64-bit) Provisions servers, connects, installs, but the webserver on the master will not start Red Hat Enterprise Linux 6.5 (HVM) - ami-5cdce419 Spot instance requests are not supported for this AMI. SuSE Linux Enterprise Server 11 sp3 (HVM) - ami-1a88bb5f Not tested - costs 10x more for spot instances, not economically viable. Ubuntu Server 14.04 LTS (HVM) - ami-f64f77b3 Provisions servers, but git is not pre-installed, so the cluster setup fails. Amazon Linux AMI (HVM) 2014.03.1 - ami-5aba831f Provisions servers, but git is not pre-installed, so the cluster setup fails. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: SecurityException when running tests with Spark 1.0.0
You can just use the Maven build for now, even for Spark 1.0.0. Matei On Jun 2, 2014, at 5:30 PM, Mohit Nayak wiza...@gmail.com wrote: Hey, Yup that fixed it. Thanks so much! Is this the only solution, or could this be resolved in future versions of Spark ? On Mon, Jun 2, 2014 at 5:14 PM, Sean Owen so...@cloudera.com wrote: If it's the SBT build, I suspect you are hitting https://issues.apache.org/jira/browse/SPARK-1949 Can you try to apply the excludes you see at https://github.com/apache/spark/pull/906/files to your build to see if it resolves it? If so I think this could be helpful to commit. On Tue, Jun 3, 2014 at 1:01 AM, Mohit Nayak wiza...@gmail.com wrote: Hey, Thanks for the reply. I am using SBT. Here is a list of my dependancies: val sparkCore= org.apache.spark % spark-core_2.10 % V.spark val hadoopCore = org.apache.hadoop % hadoop-core % V.hadoop% provided val jodaTime = com.github.nscala-time %% nscala-time % 0.8.0 val scalaUtil= com.twitter %% util-collection % V.util val logback = ch.qos.logback % logback-classic % 1.0.6 % runtime var openCsv = net.sf.opencsv % opencsv % 2.1 var scalaTest= org.scalatest % scalatest_2.10 % 2.1.0 % test var scalaIOCore = com.github.scala-incubator.io %% scala-io-core % V.scalaIO var scalaIOFile = com.github.scala-incubator.io %% scala-io-file % V.scalaIO var kryo = com.esotericsoftware.kryo % kryo % 2.16 var spray = io.spray %% spray-json % 1.2.5 var scala_reflect = org.scala-lang % scala-reflect % 2.10.3 On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen so...@cloudera.com wrote: This ultimately means you have a couple copies of the servlet APIs in the build. What is your build like (SBT? Maven?) and what exactly are you depending on? On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak wiza...@gmail.com wrote: Hi, I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw a java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package I'm using Hadoop-core 1.0.4 and running this locally. I noticed that there was an issue regarding this and was marked as resolved [https://issues.apache.org/jira/browse/SPARK-1693] Please guide.. -- -Mohit wiza...@gmail.com -- -Mohit wiza...@gmail.com -- -Mohit wiza...@gmail.com -- -Mohit wiza...@gmail.com
Re: wholeTextFiles() : java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
Yeah unfortunately Hadoop 2 requires these binaries on Windows. Hadoop 1 runs just fine without them. Matei On Jun 3, 2014, at 10:33 AM, Sean Owen so...@cloudera.com wrote: I'd try the internet / SO first -- these are actually generic Hadoop-related issues. Here I think you don't have HADOOP_HOME or similar set. http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path On Tue, Jun 3, 2014 at 5:54 PM, toivoa toivo@gmail.com wrote: Wow! What a quick reply! adding dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.4.0/version /dependency solved the problem. But now I get 14/06/03 19:52:50 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) thanks toivo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-java-lang-IncompatibleClassChangeError-Found-class-org-apache-hadoop-mapreduce-TaskAtd-tp6818p6820.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Better line number hints for logging?
You can use RDD.setName to give it a name. There’s also a creationSite field that is private[spark] — we may want to add a public setter for that later. If the name isn’t enough and you’d like this, please open a JIRA issue for it. Matei On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote: I have created some extension methods for RDDs in RichRecordRDD and these are working exceptionally well for me. However, when looking at the logs, its impossible to tell what's going on because all the line number hints point to RichRecordRDD.scala rather than the code that uses it. For example: INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at RichRecordRDD.scala:633), which is now runnable Is there any way set up my extension methods class so that the logs will print a more useful line number?
Re: Invalid Class Exception
What Java version do you have, and how did you get Spark (did you build it yourself by any chance or download a pre-built one)? If you build Spark yourself you need to do it with Java 6 — it’s a known issue because of the way Java 6 and 7 package JAR files. But I haven’t seen it result in this particular error. Matei On Jun 3, 2014, at 5:18 PM, Suman Somasundar suman.somasun...@oracle.com wrote: Hi all, I get the following exception when using Spark to run example k-means program. I am using Spark 1.0.0 and running the program locally. java.io.InvalidClassException: scala.Tuple2; invalid descriptor for field _1 at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:697) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:827) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1583) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:87) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:101) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:100) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.init(ObjectStreamField.java:119) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:695) ... 26 more Anyone know why this is happening? Thanks, Suman.
Re: Yay for 1.0.0! EC2 Still has problems.
Ah, sorry to hear you had more problems. Some thoughts on them: Thanks for that, Matei! I'll look at that once I get a spare moment. :-) If you like, I'll keep documenting my newbie problems and frustrations... perhaps it might make things easier for others. Another issue I seem to have found (now that I can get small clusters up): some of the examples (the streaming.Twitter ones especially) depend on there being a /mnt/spark and /mnt2/spark directory (I think for java tempfiles?) and those don't seem to exist out-of-the-box. I have to create those directories and use copy-dir to get them to the workers before those examples run. I think this is a side-effect of the r3 instances not having those drives mounted. Our setup script would normally create these directories. What was the error? Much of the the last two days for me have been about failing to get any of my own code to work, except for in spark-shell. (which is very nice, btw) At first I tried editing the examples, because I took the documentation literally when it said Finally, Spark includes several samples in the examples directory (Scala, Java, Python). You can run them as follows: but of course didn't realize editing them is pointless because while the source is there, the code is actually pulled from a .jar elsewhere. Doh. (so obvious in hindsight) I couldn't even turn down the voluminous INFO messages to WARNs, no matter how many conf/log4j.properties files I edited or copy-dir'd. I'm sure there's a trick to that I'm not getting. What did you change log4j.properties to? It should be changed to say log4j.rootCategory=WARN, console but maybe another log4j.properties is somehow arriving on the classpath. This is definitely a common problem so we need to add some explicit docs on it. Even trying to build SimpleApp I've run into the problem that all the documentation says to use sbt/sbt assemble, but sbt doesn't seem to be in the 1.0.0 pre-built packages that I downloaded. Are you going through http://spark.apache.org/docs/latest/quick-start.html? You should be able to do just sbt package. Once you do that you don’t need to deploy your application’s JAR to the cluster, just pass it to spark-submit and it will automatically be sent over. Ah... yes.. there it is in the source package. I suppose that means that in order to deploy any new code to the cluster, I've got to rebuild from source on my cluster controller. OK, I never liked that Amazon Linux AMI anyway. I'm going to start from scratch again with an Ubuntu 12.04 instance, hopefully that will be more auspicious... Meanwhile I'm learning scala... Great Turing's Ghost, it's the dream language we've theorized about for years! I hadn't realized! Indeed, glad you’re enjoying it. Matei On Mon, Jun 2, 2014 at 12:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote: FYI, I opened https://issues.apache.org/jira/browse/SPARK-1990 to track this. Matei On Jun 1, 2014, at 6:14 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Sort of.. there were two separate issues, but both related to AWS.. I've sorted the confusion about the Master/Worker AMI ... use the version chosen by the scripts. (and use the right instance type so the script can choose wisely) But yes, one also needs a launch machine to kick off the cluster, and for that I _also_ was using an Amazon instance... (made sense.. I have a team that will needs to do things as well, not just me) and I was just pointing out that if you use the most recommended by Amazon AMI (for your free micro instance, for example) you get python 2.6 and the ec2 scripts fail. That merely needs a line in the documentation saying use Ubuntu for your cluster controller, not Amazon Linux or somesuch. But yeah, for a newbie, it was hard working out when to use default or custom AMIs for various parts of the setup. On Mon, Jun 2, 2014 at 4:01 AM, Patrick Wendell pwend...@gmail.com wrote: Hey just to clarify this - my understanding is that the poster (Jeremey) was using a custom AMI to *launch* spark-ec2. I normally launch spark-ec2 from my laptop. And he was looking for an AMI that had a high enough version of python. Spark-ec2 itself has a flag -a that allows you to give a specific AMI. This flag is just an internal tool that we use for testing when we spin new AMI's. Users can't set that to an arbitrary AMI because we tightly control things like the Java and OS versions, libraries, etc. On Sun, Jun 1, 2014 at 12:51 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: *sigh* OK, I figured it out. (Thank you Nick, for the hint) m1.large works, (I swear I tested that earlier and had similar issues... ) It was my obsession with starting r3.*large instances. Clearly I hadn't patched the script in all the places.. which I think caused it to default to the Amazon AMI. I'll have to take a closer look at the code
Re: Upgradation to Spark 1.0.0
You can copy your configuration from the old one. I’d suggest just downloading it to a different location on each node first for testing, then you can delete the old one if things work. On Jun 3, 2014, at 12:38 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi , I am currently using SPARK 0.9 configured with the Hadoop 1.2.1 cluster.What should I do if I want to upgrade it to spark 1.0.0?Do I need to download the latest version and replace the existing spark with new one and make the configuration changes again from the scratch or is there any other way to move ahead? Thanks, Meethu M
Re: Join : Giving incorrect result
If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: reuse hadoop code in Spark
Yes, you can write some glue in Spark to call these. Some functions to look at: - SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf configured by Hadoop (including InputFormat, paths, etc) - RDD.mapPartitions lets you operate in all the values on one partition (block) at a time, similar to how Mappers in MapReduce work - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. - RDD.pipe() can be used to call out to a script or binary, like Hadoop Streaming. A fair number of people have been running both Java and Hadoop Streaming apps like this. Matei On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
Re: Better line number hints for logging?
That’s a good idea too, maybe we can change CallSiteInfo to do that. Matei On Jun 4, 2014, at 8:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Oh, this would be super useful for us too! Actually wouldn't it be best if you could see the whole call stack on the UI, rather than just one line? (Of course you would have to click to expand it.) On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote: Ok, I will probably open a Jira. On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can use RDD.setName to give it a name. There’s also a creationSite field that is private[spark] — we may want to add a public setter for that later. If the name isn’t enough and you’d like this, please open a JIRA issue for it. Matei On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote: I have created some extension methods for RDDs in RichRecordRDD and these are working exceptionally well for me. However, when looking at the logs, its impossible to tell what's going on because all the line number hints point to RichRecordRDD.scala rather than the code that uses it. For example: INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at RichRecordRDD.scala:633), which is now runnable Is there any way set up my extension methods class so that the logs will print a more useful line number?
Re: pyspark join crash
In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: How can I dispose an Accumulator?
All of these are disposed of automatically if you stop the context or exit the program. Matei On Jun 4, 2014, at 2:22 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: Will the broadcast variables be disposed automatically if the context is stopped, or do I still need to unpersist()? On Sat, May 31, 2014 at 1:20 PM, Patrick Wendell pwend...@gmail.com wrote: Hey There, You can remove an accumulator by just letting it go out of scope and it will be garbage collected. For broadcast variables we actually store extra information for it, so we provide hooks for users to remove the associated state. There is no such need for accumulators, though. - Patrick On Thu, May 29, 2014 at 2:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, How can I dispose an Accumulator? It has no method like 'unpersist()' which Broadcast provides. Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: pyspark join crash
I think the problem is that once unpacked in Python, the objects take considerably more space, as they are stored as Python objects in a Python dictionary. Take a look at python/pyspark/join.py and combineByKey in python/pyspark/rdd.py. We should probably try to store these in serialized form. I’m not sure whether there’s a great way to inspect a Python process’s memory, but looking at what consumes memory in a reducer process would be useful. Matei On Jun 4, 2014, at 2:34 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Matei, Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task. The workers seem to load all data from each block containing a record needed by the reduce task. I base this hypothesis on the following: -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled -There are 130K records -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) -There are 3 cores per node (each running one reduce task at a time) -Each node has 32G of memory Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled. Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average. The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines). Indeed, I observe a large memory spike at each node. When I attempt the join with 2000 output partitions, it succeeds. Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself). Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average. I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks). If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065). -Brad On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote: In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: Why Scala?
We are definitely investigating a Python API for Streaming, but no announced deadline at this point. Matei On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote: So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 (http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick View this message in context: Why Scala? Sent from the Apache Spark User List mailing list archive at Nabble.com.